Skip to content

Commit

Permalink
feat(query) Implement Protobuf definitions of all of the execution pl…
Browse files Browse the repository at this point in the history
…ans (#1705)

* Protobuf definitions of all of the execution plans

* updating for changes in exec plans

* accommodating changes in TsCardExec

---------

Co-authored-by: Kier Petrov <[email protected]>
  • Loading branch information
kvpetrov and Kier Petrov authored Feb 5, 2024
1 parent 618eae0 commit 32e4eeb
Show file tree
Hide file tree
Showing 7 changed files with 3,947 additions and 14 deletions.
11 changes: 10 additions & 1 deletion coordinator/src/main/scala/filodb.coordinator/QueryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import filodb.core.query.QuerySession
import filodb.core.query.QueryStats
import filodb.core.query.SerializedRangeVector
import filodb.core.store.CorruptVectorException
import filodb.grpc.ExecPlans.ExecPlanContainer
import filodb.query._
import filodb.query.exec.{ExecPlan, InProcessPlanDispatcher}

Expand Down Expand Up @@ -291,12 +292,20 @@ final class QueryActor(memStore: TimeSeriesStore,
}
}

def execProtoExecPlan(pep: ProtoExecPlan, replyTo: ActorRef): Unit = {
import filodb.coordinator.ProtoConverters._
val c = ExecPlanContainer.parseFrom(pep.serializedExecPlan)
val plan: ExecPlan = c.fromProto()
execPhysicalPlan2(plan, replyTo)
}

def receive: Receive = {
case q: LogicalPlan2Query => val replyTo = sender()
processLogicalPlan2Query(q, replyTo)
case q: ExplainPlan2Query => val replyTo = sender()
processExplainPlanQuery(q, replyTo)
case q: ExecPlan => execPhysicalPlan2(q, sender())
case q: ExecPlan => execPhysicalPlan2(q, sender())
case q: ProtoExecPlan => execProtoExecPlan(q, sender())
case q: GetTopkCardinality => execTopkCardinalityQuery(q, sender())

case GetIndexNames(ref, limit, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ object QueryCommands {
logicalPlan: LogicalPlan2,
qContext: QueryContext = QueryContext(),
submitTime: Long = System.currentTimeMillis()) extends QueryCommand

final case class ProtoExecPlan(dataset: DatasetRef,
serializedExecPlan: Array[Byte],
submitTime: Long = System.currentTimeMillis()) extends QueryCommand
// Error responses from query
final case class UndefinedColumns(undefined: Set[String]) extends ErrorResponse
final case class BadArgument(msg: String) extends ErrorResponse with QueryResponse
Expand Down
Loading

0 comments on commit 32e4eeb

Please sign in to comment.