Skip to content

Commit

Permalink
Merge pull request filodb#1734 from sandeep6189/integration
Browse files Browse the repository at this point in the history
Cherry Picking PRs from develop for 0.9.25 release
  • Loading branch information
sandeep6189 authored Mar 4, 2024
2 parents 423e470 + e397401 commit d1b45eb
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable

import filodb.coordinator.client.QueryCommands.ProtoExecPlan
import filodb.core.QueryTimeoutException
import filodb.core.query.{QueryStats, QueryWarnings, ResultSchema}
import filodb.core.store.ChunkSource
import filodb.query.{QueryResponse, QueryResult, StreamQueryResponse, StreamQueryResultFooter}
import filodb.query.{QueryCommand, QueryResponse, QueryResult, StreamQueryResponse, StreamQueryResultFooter}
import filodb.query.Query.qLogger
import filodb.query.exec.{ExecPlanWithClientParams, PlanDispatcher}

Expand All @@ -24,6 +25,19 @@ import filodb.query.exec.{ExecPlanWithClientParams, PlanDispatcher}
*/
case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends PlanDispatcher {

def getCaseClassOrProtoExecPlan(execPlan: filodb.query.exec.ExecPlan): QueryCommand = {
val doProto = execPlan.queryContext.plannerParams.useProtoExecPlans
val ep =
if (doProto) {
import filodb.coordinator.ProtoConverters._
val protoPlan = execPlan.toExecPlanContainerProto
ProtoExecPlan(execPlan.dataset, protoPlan.toByteArray, execPlan.submitTime)
} else {
execPlan
}
ep
}

def dispatch(plan: ExecPlanWithClientParams, source: ChunkSource)(implicit sched: Scheduler): Task[QueryResponse] = {
// "source" is unused (the param exists to support InProcessDispatcher).
val queryTimeElapsed = System.currentTimeMillis() - plan.execPlan.queryContext.submitTime
Expand All @@ -44,7 +58,8 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl
emptyPartialResult
})
} else {
val fut = (target ? plan.execPlan) (t).map {
val message = getCaseClassOrProtoExecPlan(plan.execPlan)
val fut = (target ? message) (t).map {
case resp: QueryResponse => resp
case e => throw new IllegalStateException(s"Received bad response $e")
}
Expand Down Expand Up @@ -83,7 +98,7 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl
} else {
ResultActor.subject
.doOnSubscribe(Task.eval {
target.tell(plan.execPlan, ResultActor.resultActor)
target.tell(getCaseClassOrProtoExecPlan(plan.execPlan), ResultActor.resultActor)
qLogger.debug(s"DISPATCHING ${plan.execPlan.planId}")
})
.filter(_.planId == plan.execPlan.planId)
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class ShardKeyRegexPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
|----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=6, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-2))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-6#893240335],raw)
|---T~PeriodicSamplesMapper(start=720000, step=60000, end=960000, window=None, functionId=None, rawSource=true, offsetMs=None)
|----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=22, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-2))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-6#893240335],raw)
|--E~PromQlRemoteExec(PromQlQueryParams(test{instance="Inst-1",_ws_="demo",_ns_="App-1"},720,60,960,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10), queryEndpoint=remote-url, requestTimeoutMs=60000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin
|--E~PromQlRemoteExec(PromQlQueryParams(test{instance="Inst-1",_ws_="demo",_ns_="App-1"},720,60,960,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remote-url, requestTimeoutMs=60000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin
val lp = Parser.queryToLogicalPlan(
"""avg_over_time(test{_ws_ = "demo", _ns_ =~ "App.*", instance = "Inst-1" }[5m:1m])""",
1000, 1000
Expand Down Expand Up @@ -203,7 +203,7 @@ class ShardKeyRegexPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=6, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-2))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-6#893240335],raw)
|--T~PeriodicSamplesMapper(start=720000, step=60000, end=960000, window=None, functionId=None, rawSource=true, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=22, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-2))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-6#893240335],raw)
|-E~PromQlRemoteExec(PromQlQueryParams(test{instance="Inst-1",_ws_="demo",_ns_="App-1"},720,60,960,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10), queryEndpoint=remote-url, requestTimeoutMs=60000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin
|-E~PromQlRemoteExec(PromQlQueryParams(test{instance="Inst-1",_ws_="demo",_ns_="App-1"},720,60,960,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false), queryEndpoint=remote-url, requestTimeoutMs=60000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0)))""".stripMargin
val lp = Parser.queryToLogicalPlan(
"""test{_ws_ = "demo", _ns_ =~ "App.*", instance = "Inst-1" }[5m:1m]""",
1000, 1000
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/filodb.core/query/QueryContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ case class PlannerParams(applicationId: String = "filodb",
processMultiPartition: Boolean = false,
allowPartialResults: Boolean = false,
reduceShardKeyRegexFanout: Boolean = true,
maxShardKeyRegexFanoutBatchSize: Int = 10)
maxShardKeyRegexFanoutBatchSize: Int = 10,
useProtoExecPlans: Boolean = false)

object PlannerParams {
def apply(constSpread: Option[SpreadProvider], sampleLimit: Int): PlannerParams =
Expand Down
33 changes: 18 additions & 15 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@ message PerQueryLimits {
}

message PlannerParams {
optional string applicationId = 1;
optional uint32 queryTimeoutMillis = 2;
optional PerQueryLimits enforcedLimits = 3;
optional PerQueryLimits warnLimits = 4;
optional string queryOrigin = 5;
optional string queryOriginId = 6;
optional string queryPrincipal = 7;
optional bool timeSplitEnabled = 8;
optional uint64 minTimeRangeForSplitMs = 9;
optional uint64 splitSizeMs = 10;
optional bool skipAggregatePresent = 11;
optional bool processFailure = 12;
optional bool processMultiPartition = 13;
optional bool allowPartialResults = 14;
optional bool histogramMap = 15;
optional string applicationId = 1;
optional uint32 queryTimeoutMillis = 2;
optional PerQueryLimits enforcedLimits = 3;
optional PerQueryLimits warnLimits = 4;
optional string queryOrigin = 5;
optional string queryOriginId = 6;
optional string queryPrincipal = 7;
optional bool timeSplitEnabled = 8;
optional uint64 minTimeRangeForSplitMs = 9;
optional uint64 splitSizeMs = 10;
optional bool skipAggregatePresent = 11;
optional bool processFailure = 12;
optional bool processMultiPartition = 13;
optional bool allowPartialResults = 14;
optional bool histogramMap = 15;
optional bool useProtoExecPlans = 16;
optional bool reduceShardKeyRegexFanout = 17;
optional uint32 maxShardKeyRegexFanoutBatchSize = 18;
}

message Request {
Expand Down
10 changes: 9 additions & 1 deletion query/src/main/scala/filodb/query/ProtoConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ object ProtoConverters {
builder.setProcessFailure(pp.processFailure)
builder.setProcessMultiPartition(pp.processMultiPartition)
builder.setAllowPartialResults(pp.allowPartialResults)
builder.setUseProtoExecPlans(pp.useProtoExecPlans)
builder.setReduceShardKeyRegexFanout(pp.reduceShardKeyRegexFanout)
builder.setMaxShardKeyRegexFanoutBatchSize(pp.maxShardKeyRegexFanoutBatchSize)
builder.build()
}
}
Expand Down Expand Up @@ -254,7 +257,12 @@ object ProtoConverters {
processFailure = if (gpp.hasProcessFailure) gpp.getProcessFailure else pp.processFailure,
processMultiPartition = if (gpp.hasProcessMultiPartition) gpp.getProcessMultiPartition
else pp.processMultiPartition,
allowPartialResults = if (gpp.hasAllowPartialResults) gpp.getAllowPartialResults else pp.allowPartialResults
allowPartialResults = if (gpp.hasAllowPartialResults) gpp.getAllowPartialResults else pp.allowPartialResults,
useProtoExecPlans = if (gpp.hasUseProtoExecPlans) gpp.getUseProtoExecPlans else pp.useProtoExecPlans,
reduceShardKeyRegexFanout = if (gpp.hasReduceShardKeyRegexFanout) gpp.getReduceShardKeyRegexFanout
else pp.reduceShardKeyRegexFanout,
maxShardKeyRegexFanoutBatchSize = if (gpp.hasMaxShardKeyRegexFanoutBatchSize)
gpp.getMaxShardKeyRegexFanoutBatchSize else pp.maxShardKeyRegexFanoutBatchSize
)
}
}
Expand Down

0 comments on commit d1b45eb

Please sign in to comment.