Skip to content

Commit

Permalink
feat(query) Add tracing capabilities to gRPC QS along with histogram …
Browse files Browse the repository at this point in the history
…metrics (#1605)
  • Loading branch information
amolnayak311 authored Jun 12, 2023
1 parent 654b7b0 commit f0b2c4d
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 22 deletions.
165 changes: 146 additions & 19 deletions http/src/main/scala/filodb/http/PromQLGrpcServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success, Try}

import com.typesafe.scalalogging.StrictLogging
import io.grpc.ServerBuilder
import io.grpc.{Metadata, ServerBuilder, ServerCall, ServerCallHandler, ServerInterceptor}
import io.grpc.ServerCall.Listener
import io.grpc.netty.NettyServerBuilder
import io.grpc.stub.StreamObserver
import kamon.Kamon
import kamon.metric.MeasurementUnit
import kamon.trace.{Identifier, Span, Trace}
import kamon.trace.Trace.SamplingDecision
import monix.execution.Scheduler
import net.ceedubs.ficus.Ficus._

Expand All @@ -22,6 +27,9 @@ import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query._




/**
*
* @param queryPlannerSelector a function that will map the datasetId (usually cluster-dataset but not always true) to
Expand All @@ -36,58 +44,82 @@ class PromQLGrpcServer(queryPlannerSelector: String => QueryPlanner,
filoSettings: FilodbSettings, scheduler: Scheduler)
extends StrictLogging {

val port = filoSettings.allConfig.getInt("filodb.grpc.bind-grpc-port")
val server = ServerBuilder.forPort(this.port)
private val port = filoSettings.allConfig.getInt("filodb.grpc.bind-grpc-port")
private val server = ServerBuilder.forPort(this.port)
.intercept(TracingInterceptor).asInstanceOf[ServerBuilder[NettyServerBuilder]]
//.executor(scheduler).asInstanceOf[ServerBuilder[NettyServerBuilder]]
.addService(new PromQLGrpcService()).asInstanceOf[ServerBuilder[NettyServerBuilder]].build()

val queryAskTimeout = filoSettings.allConfig.as[FiniteDuration]("filodb.query.ask-timeout")
private val queryAskTimeout = filoSettings.allConfig.as[FiniteDuration]("filodb.query.ask-timeout")

private val queryResponseLatency = Kamon.histogram("grpc-query-latency", MeasurementUnit.time.nanoseconds)
.withoutTags()

private class PromQLGrpcService extends RemoteExecImplBase {

private def executeQuery(request: GrpcMultiPartitionQueryService.Request)(f: QueryResponse => Unit): Unit = {
private def executeQuery(request: GrpcMultiPartitionQueryService.Request, span: Span)
(f: QueryResponse => Unit): Unit = {
import filodb.query.ProtoConverters._
implicit val timeout: FiniteDuration = queryAskTimeout
implicit val dispatcherScheduler: Scheduler = scheduler
val queryParams = request.getQueryParams()
span.mark("Sending query request")
val queryParams = request.getQueryParams
val config = QueryContext(origQueryParams = request.getQueryParams.fromProto,
plannerParams = request.getPlannerParams.fromProto)
val eval = Try {
val queryPlanner = queryPlannerSelector(request.getPlannerSelector)
// Catch parsing errors, query materialization and errors in dispatch
val logicalPlan = Parser.queryRangeToLogicalPlan(
queryParams.getPromQL(),
TimeStepParams(queryParams.getStart(), queryParams.getStep(), queryParams.getEnd()))
queryParams.getPromQL,
TimeStepParams(queryParams.getStart, queryParams.getStep, queryParams.getEnd))

val exec = queryPlanner.materialize(logicalPlan, config)
queryPlanner.dispatchExecPlan(exec, kamon.Kamon.currentSpan()).foreach(f)
queryPlanner.dispatchExecPlan(exec, span).foreach(f)
}
eval match {
case Failure(t) =>
logger.error("Caught failure while executing query", t)
f(QueryError(config.queryId, QueryStats(), t))
case _ => //Nop, for success we dont care as the response is already notified
span.fail("Query execution failed", t)
case _ => span.mark("query execution successful")
}
}

//scalastyle:off method.length
override def execStreaming(request: GrpcMultiPartitionQueryService.Request,
responseObserver: StreamObserver[GrpcMultiPartitionQueryService.StreamingResponse]): Unit = {
import filodb.query.ProtoConverters._
import filodb.query.QueryResponseConverter._
executeQuery(request) {
val span = Kamon.currentSpan()
val startNs = System.nanoTime()
executeQuery(request, span) {
// Catch all error
qr: QueryResponse =>
Try {
lazy val rb = SerializedRangeVector.newBuilder()
qr.toStreamingResponse.foreach {
case footer: StreamQueryResultFooter =>
responseObserver.onNext(footer.toProto)
span.mark("Received the footer of streaming response")
span.finish()
val endNs = System.nanoTime()
queryResponseLatency
.withTag("status", "success")
.withTag("dataset", request.getDataset)
.record(endNs - startNs)
responseObserver.onCompleted()
case error: StreamQueryError =>
responseObserver.onNext(error.toProto)
span.fail(error.t)
span.finish()
val endNs = System.nanoTime()
queryResponseLatency
.withTag("status", "error")
.withTag("dataset", request.getDataset)
.record(endNs - startNs)
responseObserver.onCompleted()
case header: StreamQueryResultHeader =>
responseObserver.onNext(header.toProto)
span.mark("Received the header of streaming response")
case result: StreamQueryResult =>
// Not the cleanest way, but we need to convert these IteratorBackedRangeVectors to a
// serializable one If we have a result, its definitely is a QueryResult
Expand All @@ -104,22 +136,29 @@ class PromQLGrpcServer(queryPlannerSelector: String => QueryPlanner,
)
case _ => result
}
span.mark("onNext of the streaming result called")
responseObserver.onNext(strQueryResult.toProto)
}
} match {
// Catch all to ensure onError is invoked
case Failure(t) =>
logger.error("Caught failure while executing query", t)
span.fail(t)
span.finish()
responseObserver.onError(t)
case Success(_) =>
}
}
}
//scalastyle:on method.length

override def exec(request: GrpcMultiPartitionQueryService.Request,
responseObserver: StreamObserver[GrpcMultiPartitionQueryService.Response]): Unit = {
import filodb.query.ProtoConverters._
executeQuery(request) {
val span = Kamon.currentSpan()
val startNs = System.nanoTime()
val hist = queryResponseLatency.withTag("dataset", request.getDataset)
executeQuery(request, span) {
qr: QueryResponse =>
Try {
val queryResponse = qr match {
Expand All @@ -139,9 +178,15 @@ class PromQLGrpcServer(queryPlannerSelector: String => QueryPlanner,
responseObserver.onNext(queryResponse.toProto)
} match {
case Failure(t) =>
logger.error("Caught failure while executing query", t)
responseObserver.onError(t)
case Success(_) => responseObserver.onCompleted()
logger.error("Caught failure while executing query", t)
span.fail(t)
span.finish()
hist.withTag("status", "error").record(System.nanoTime() - startNs)
responseObserver.onError(t)
case Success(_) =>
span.finish()
hist.withTag("status", "success").record(System.nanoTime() - startNs)
responseObserver.onCompleted()
}
}
}
Expand All @@ -155,11 +200,93 @@ class PromQLGrpcServer(queryPlannerSelector: String => QueryPlanner,
}

def start(): Unit = {
server.start();
logger.info("Server started, listening on " + this.port);
Runtime.getRuntime().addShutdownHook(new Thread() {
server.start()
logger.info("Server started, listening on " + this.port)
Runtime.getRuntime.addShutdownHook(new Thread() {
() => PromQLGrpcServer.this.stop()
})
}

}

object TracingInterceptor extends ServerInterceptor with StrictLogging {
override def interceptCall[ReqT, RespT]
(call: ServerCall[ReqT, RespT], headers: Metadata,
next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
val span = TracingUtil.startAndGetCurrentSpan(headers)
val listener = next.startCall(call, headers)


new Listener[ReqT]() {

override def onHalfClose(): Unit =
Kamon.runWithSpan(span, finishSpan = false) {
listener.onHalfClose()
}

override def onCancel(): Unit =
Kamon.runWithSpan(span, finishSpan = false) {
listener.onCancel()
}

override def onComplete(): Unit =
Kamon.runWithSpan(span, finishSpan = false) {
listener.onComplete()
}

override def onReady(): Unit =
Kamon.runWithSpan(span, finishSpan = false) {
listener.onReady()
}

override def onMessage(message: ReqT): Unit =
Kamon.runWithSpan(span, finishSpan = false) {
listener.onMessage(message)
}
}
}
}
object TracingUtil extends StrictLogging {
// Current supported traces are openzipkin trace identifiers, in future support for others will be added
private val TRACE_ID_HEADER = "X-B3-TraceId"
private val SPAN_ID_HEADER = "X-B3-SpanId"
private val TRACE_SAMPLED_HEADER = "X-B3-Sampled"
private val PARENT_SPAN_ID_HEADER = "X-B3-ParentSpanId"

def startAndGetCurrentSpan(md: Metadata): Span = {
val parentSpan = getRemoteTrace(md)
val spanBuilder = Kamon.spanBuilder("query_grpc").tag("method", "execStreaming").asChildOf(parentSpan)
spanBuilder.start()
}

private def getRemoteTrace(md: Metadata): Span.Remote = {
val traceIdKey = Metadata.Key.of(TRACE_ID_HEADER, Metadata.ASCII_STRING_MARSHALLER)
val traceIdentifier =
if (md.containsKey(traceIdKey))
Identifier.Scheme.Single.traceIdFactory.from(md.get(traceIdKey))
else
Identifier.Scheme.Single.traceIdFactory.generate()

val spanIdKey = Metadata.Key.of(SPAN_ID_HEADER, Metadata.ASCII_STRING_MARSHALLER)
val spanIdentifier =
if (md.containsKey(spanIdKey))
Identifier.Scheme.Single.traceIdFactory.from(md.get(spanIdKey))
else
Identifier.Scheme.Single.traceIdFactory.generate()

val parentSpanId = Metadata.Key.of(PARENT_SPAN_ID_HEADER, Metadata.ASCII_STRING_MARSHALLER)
val parentSpanIdentifier =
if (md.containsKey(parentSpanId))
Identifier.Scheme.Single.traceIdFactory.from(md.get(parentSpanId))
else
Identifier.Empty

val sampled = Metadata.Key.of(TRACE_SAMPLED_HEADER, Metadata.ASCII_STRING_MARSHALLER)
val sampleTrace = if (md.containsKey(sampled)) "1".equals(md.get(sampled)) else false

if (sampleTrace)
Span.Remote(spanIdentifier, parentSpanIdentifier, Trace.create(traceIdentifier, SamplingDecision.Sample))
else
Span.Remote(spanIdentifier, parentSpanIdentifier, Trace.create(traceIdentifier, SamplingDecision.DoNotSample))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import java.util.concurrent.TimeUnit

import scala.concurrent.Future

import io.grpc.Channel
import io.grpc.stub.StreamObserver
import io.grpc.{Channel, Metadata}
import io.grpc.stub.{MetadataUtils, StreamObserver}
import kamon.Kamon
import kamon.trace.Span
import monix.eval.Task
Expand Down Expand Up @@ -89,7 +89,12 @@ case class PromQLGrpcRemoteExec(channel: Channel,
subject
.doOnSubscribe(Task.eval {
val nonBlockingStub = RemoteExecGrpc.newStub(channel)
nonBlockingStub.withDeadlineAfter(requestTimeoutMs, TimeUnit.MILLISECONDS)
val md = new Metadata();
queryContext.traceInfo.foreach {
case (key, value) => md.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
}
nonBlockingStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(md))
.withDeadlineAfter(requestTimeoutMs, TimeUnit.MILLISECONDS)
.execStreaming(getGrpcRequest(plannerSelector),
new StreamObserver[GrpcMultiPartitionQueryService.StreamingResponse] {
override def onNext(value: StreamingResponse): Unit = subject.onNext(value)
Expand Down

0 comments on commit f0b2c4d

Please sign in to comment.