Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): reducing size of proto execution plans #1916

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl
if (doProto) {
import filodb.coordinator.ProtoConverters._
val protoPlan = execPlan.toExecPlanContainerProto
ProtoExecPlan(execPlan.dataset, protoPlan.toByteArray, execPlan.submitTime)
val protoQueryContext = execPlan.queryContext.toProto
ProtoExecPlan(execPlan.dataset, protoPlan.toByteArray, protoQueryContext.toByteArray, execPlan.submitTime)
} else {
execPlan
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
package filodb.coordinator

import java.net.InetAddress
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.FiniteDuration

import com.typesafe.scalalogging.StrictLogging
import io.grpc.Metadata
import io.grpc.stub.{MetadataUtils, StreamObserver}
import java.net.InetAddress
import java.util.concurrent.TimeUnit
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.{MulticastStrategy, Observable}
import monix.reactive.subjects.ConcurrentSubject
import scala.concurrent.duration.FiniteDuration

import filodb.core.QueryTimeoutException
import filodb.core.store.ChunkSource
import filodb.grpc.GrpcCommonUtils
import filodb.grpc.GrpcMultiPartitionQueryService
import filodb.grpc.GrpcMultiPartitionQueryService.RemoteExecPlan
import filodb.grpc.RemoteExecGrpc
import filodb.query.{QueryResponse, StreamQueryResponse}
import filodb.query.exec.{ExecPlanWithClientParams, GenericRemoteExec, PlanDispatcher}


object GrpcPlanDispatcher {

import io.grpc.ManagedChannel
import scala.collection.concurrent.TrieMap

val channelMap = new TrieMap[String, ManagedChannel]
Runtime.getRuntime.addShutdownHook(new Thread(() => channelMap.values.foreach(_.shutdown())))

}

case class GrpcPlanDispatcher(endpoint: String, requestTimeoutMs: Long) extends PlanDispatcher {
case class GrpcPlanDispatcher(endpoint: String, requestTimeoutMs: Long) extends PlanDispatcher with StrictLogging {

val clusterName = InetAddress.getLocalHost().getHostName()

Expand Down Expand Up @@ -69,7 +67,13 @@ case class GrpcPlanDispatcher(endpoint: String, requestTimeoutMs: Long) extends
val genericRemoteExec = plan.execPlan.asInstanceOf[GenericRemoteExec]
import filodb.coordinator.ProtoConverters._
val protoPlan = genericRemoteExec.execPlan.toExecPlanContainerProto

logger.debug(s"Query ${plan.execPlan.queryContext.queryId} proto plan size is ${protoPlan.toByteArray.length}B")
logger.debug(s"Query ${plan.execPlan.queryContext.queryId} exec plan ${genericRemoteExec.execPlan.printTree()}")
val queryContextProto = genericRemoteExec.execPlan.queryContext.toProto
val remoteExecPlan : RemoteExecPlan = RemoteExecPlan.newBuilder()
.setExecPlan(protoPlan)
.setQueryContext(queryContextProto)
.build()
val channel =
GrpcPlanDispatcher.channelMap.getOrElseUpdate(endpoint, GrpcCommonUtils.buildChannelFromEndpoint(endpoint))
val observableResponse: monix.reactive.Observable[GrpcMultiPartitionQueryService.StreamingResponse] = {
Expand All @@ -85,7 +89,7 @@ case class GrpcPlanDispatcher(endpoint: String, requestTimeoutMs: Long) extends
nonBlockingStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(md))
.withDeadlineAfter(requestTimeoutMs, TimeUnit.MILLISECONDS)
.executePlan(
protoPlan,
remoteExecPlan,
new StreamObserver[GrpcMultiPartitionQueryService.StreamingResponse] {
override def onNext(value: GrpcMultiPartitionQueryService.StreamingResponse): Unit =
subject.onNext(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ final class QueryActor(memStore: TimeSeriesStore,
def execProtoExecPlan(pep: ProtoExecPlan, replyTo: ActorRef): Unit = {
import filodb.coordinator.ProtoConverters._
val c = ExecPlanContainer.parseFrom(pep.serializedExecPlan)
val plan: ExecPlan = c.fromProto()
val queryContextProto =
filodb.grpc.GrpcMultiPartitionQueryService.QueryContext.parseFrom(pep.serializedQueryContext)
val queryContext = queryContextProto.fromProto
val plan: ExecPlan = c.fromProto(queryContext)
execPhysicalPlan2(plan, replyTo)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ object QueryCommands {

final case class ProtoExecPlan(dataset: DatasetRef,
serializedExecPlan: Array[Byte],
serializedQueryContext: Array[Byte],
submitTime: Long = System.currentTimeMillis()) extends QueryCommand
// Error responses from query
final case class UndefinedColumns(undefined: Set[String]) extends ErrorResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ trait DefaultPlanner {
val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext)
val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window)
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(logicalPlanWithoutBucket.startMs,
logicalPlanWithoutBucket.stepMs, logicalPlanWithoutBucket.endMs, window, Some(execRangeFn), qContext,
logicalPlanWithoutBucket.stepMs, logicalPlanWithoutBucket.endMs, window, Some(execRangeFn),
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource = rawSource)))
if (logicalPlanWithoutBucket.function == RangeFunctionId.AbsentOverTime) {
Expand Down Expand Up @@ -208,7 +208,8 @@ trait DefaultPlanner {
case _ => true
})
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(lp.startMs, lp.stepMs, lp.endMs,
window = None, functionId = None, qContext, stepMultipleNotationUsed = false, funcParams = Nil,
window = None, functionId = None,
stepMultipleNotationUsed = false, funcParams = Nil,
lp.offsetMs, rawSource = rawSource)))

if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
Expand Down Expand Up @@ -450,7 +451,6 @@ trait DefaultPlanner {
sqww.atMs.getOrElse(sqww.startMs), sqww.stepMs, sqww.atMs.getOrElse(sqww.endMs),
window,
Some(rangeFn),
qContext,
stepMultipleNotationUsed = false,
paramsExec,
sqww.offsetMs,
Expand Down Expand Up @@ -486,7 +486,6 @@ trait DefaultPlanner {
realScanStartMs, realScanStep, realScanEndMs,
window,
Some(InternalRangeFunction.lpToInternalFunc(RangeFunctionId.Last)),
qContext,
stepMultipleNotationUsed = false,
Seq(),
offsetMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ class SingleClusterPlanner(val dataset: Dataset,
val paramsExec = materializeFunctionArgs(logicalPlanWithoutBucket.functionArgs, qContext)
val window = if (execRangeFn == InternalRangeFunction.Timestamp) None else Some(logicalPlanWithoutBucket.window)
series.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs,
realScanStepMs, realScanEndMs, window, Some(execRangeFn), qContext,
realScanStepMs, realScanEndMs, window, Some(execRangeFn),
logicalPlanWithoutBucket.stepMultipleNotationUsed,
paramsExec, logicalPlanWithoutBucket.offsetMs, rawSource)))

Expand Down Expand Up @@ -879,7 +879,7 @@ class SingleClusterPlanner(val dataset: Dataset,

val rawSeries = walkLogicalPlanTree(lpWithoutBucket.rawSeries, qContext, forceInProcess)
rawSeries.plans.foreach(_.addRangeVectorTransformer(PeriodicSamplesMapper(realScanStartMs, realScanStepMs,
realScanEndMs, None, None, qContext, stepMultipleNotationUsed = false, Nil, lp.offsetMs)))
realScanEndMs, None, None, stepMultipleNotationUsed = false, Nil, lp.offsetMs)))

if (nameFilter.isDefined && nameFilter.head.endsWith("_bucket") && leFilter.isDefined) {
val paramsExec = StaticFuncArgs(leFilter.head.toDouble, RangeParams(realScanStartMs/1000, realScanStepMs/1000,
Expand Down
Loading
Loading