Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): cherry picking shard level failover metrics #1860

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import scala.jdk.CollectionConverters._

import com.typesafe.scalalogging.StrictLogging
import io.grpc.ManagedChannel
import kamon.Kamon

import filodb.coordinator.GrpcPlanDispatcher
import filodb.coordinator.ShardMapper
Expand All @@ -17,6 +18,9 @@ import filodb.grpc.GrpcCommonUtils
import filodb.query.{LabelNames, LabelValues, LogicalPlan, SeriesKeysByFilters}
import filodb.query.exec._

object HighAvailabilityPlanner {
final val FailoverCounterName = "single-cluster-failovers-materialized"
}
/**
* HighAvailabilityPlanner responsible for using underlying local planner and FailureProvider
* to come up with a plan that orchestrates query execution between multiple
Expand Down Expand Up @@ -48,6 +52,24 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
import QueryFailureRoutingStrategy._
import LogicalPlan._

// legacy failover counter captures failovers when we send a PromQL to the buddy
// cluster
val legacyFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "legacy")

// full failover counter captures failovers when we materialize a plan locally and
// send an entire plan to the buddy cluster for execution
val fullFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "full")

// partial failover counter captures failovers when we materialize a plan locally and
// send some parts of it for execution to the buddy cluster
val partialFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "partial")

// HTTP endpoint is still mandatory as metadata queries still use it.
val remoteHttpEndpoint: String = queryConfig.remoteHttpEndpoint
.getOrElse(throw new IllegalArgumentException("remoteHttpEndpoint config needed"))
Expand Down Expand Up @@ -110,6 +132,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
qContext)
}
case route: RemoteRoute =>
legacyFailoverCounter.increment()
val timeRange = route.timeRange.get
val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
// rootLogicalPlan can be different from queryParams.promQl
Expand Down Expand Up @@ -341,6 +364,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
localActiveShardMapper: ActiveShardMapper,
remoteActiveShardMapper: ActiveShardMapper
): ExecPlan = {
partialFailoverCounter.increment()
// it makes sense to do local planning if we have at least 50% of shards running
// as the query might overload the few shards we have while doing second level aggregation
// Generally, it would be better to ship the entire query to the cluster that has more shards up
Expand All @@ -360,6 +384,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
buddyGrpcEndpoint = Some(remoteGrpcEndpoint.get)
)
val context = qContext.copy(plannerParams = haPlannerParams)
logger.info(context.getQueryLogLine("Using shard level failover"))
val plan = localPlanner.materialize(logicalPlan, context);
plan
}
Expand Down Expand Up @@ -419,6 +444,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
qContext: QueryContext,
localActiveShardMapper: ActiveShardMapper, remoteActiveShardMapper: ActiveShardMapper
): GenericRemoteExec = {
fullFailoverCounter.increment()
val timeout: Long = queryConfig.remoteHttpTimeoutMs.getOrElse(60000)
val plannerParams = qContext.plannerParams.copy(
failoverMode = filodb.core.query.ShardLevelFailoverMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class SingleClusterPlanner(val dataset: Dataset,
private val shardColumns = dsOptions.shardKeyColumns.sorted
private val dsRef = dataset.ref

// failed failover counter captures failovers which are not possible because at least one shard
// is down both on the primary and DR clusters, the query will get executed only when the
// partial results are acceptable otherwise an exception is thrown
val shardUnavailableFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
.withTag("cluster", clusterName)
.withTag("type", "shardUnavailable")

val numPlansMaterialized = Kamon.counter("single-cluster-plans-materialized")
.withTag("cluster", clusterName)
Expand Down Expand Up @@ -192,10 +198,12 @@ class SingleClusterPlanner(val dataset: Dataset,
if (!shardInfo.active) {
if (queryContext.plannerParams.allowPartialResults)
logger.debug(s"Shard: $shard is not available however query is proceeding as partial results is enabled")
else
else {
shardUnavailableFailoverCounter.increment()
throw new filodb.core.query.ServiceUnavailableException(
s"Remote Buddy Shard: $shard is not available"
)
}
}
val dispatcher = RemoteActorPlanDispatcher(shardInfo.address, clusterName)
dispatcher
Expand Down
Loading