Skip to content

Commit

Permalink
fix scala format.(have a reference for the future.)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Zhang committed Sep 12, 2023
1 parent 7adc382 commit f8034d7
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 52 deletions.
5 changes: 5 additions & 0 deletions core/src/main/scala/filodb.core/query/RangeVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ trait RangeVector {
// FIXME remove default in numRows since many impls simply default to None. Shouldn't scalars implement this
def numRows: Option[Int] = None

def isScalar: Boolean = false

def prettyPrint(formatTime: Boolean = true): String = "RV String Not supported"
}

Expand Down Expand Up @@ -199,6 +201,9 @@ sealed trait ScalarSingleValue extends ScalarRangeVector {
def rangeParams: RangeParams
override def outputRange: Option[RvRange] = Some(RvRange(rangeParams.startSecs * 1000,
rangeParams.stepSecs * 1000, rangeParams.endSecs * 1000))
override val isScalar: Boolean = {
outputRange.get.stepMs <= 0 || rangeParams.startSecs == rangeParams.endSecs
}
val numRowsSerialized : Int = 1

override def rows(): RangeVectorCursor = {
Expand Down
2 changes: 2 additions & 0 deletions http/src/main/scala/filodb/http/PrometheusApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import filodb.core.query.{PromQlQueryParams, QueryConfig, QueryContext, TsdbQuer
import filodb.prometheus.ast.TimeStepParams
import filodb.prometheus.parse.Parser
import filodb.query._
import filodb.query.Query.qLogger
import filodb.query.exec.ExecPlan

class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit am: ActorMaterializer)
Expand Down Expand Up @@ -71,6 +72,7 @@ class PrometheusApiRoute(nodeCoord: ActorRef, settings: HttpSettings)(implicit a
{ (query, time, explainOnly, verbose, spread, histMap, step, partialResults) =>
val stepLong = step.map(_.toLong).getOrElse(0L)
val logicalPlan = Parser.queryToLogicalPlan(query, time.toLong, stepLong)
qLogger.info(s"ggggggggggggg logicalPlan $logicalPlan")
askQueryAndRespond(dataset, logicalPlan, explainOnly.getOrElse(false),
verbose.getOrElse(false), spread, PromQlQueryParams(query, time.toLong, stepLong, time.toLong),
histMap.getOrElse(false), partialResults.getOrElse(queryConfig.allowPartialResultsRangeQuery))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package filodb.prometheus.query

import remote.RemoteStorage._

import filodb.core.GlobalConfig
import filodb.core.binaryrecord2.{BinaryRecordRowReader, StringifyMapItemConsumer}
import filodb.core.metadata.Column.ColumnType
Expand All @@ -10,6 +9,7 @@ import filodb.core.query.{Result => _, _}
import filodb.prometheus.parse.Parser.REGEX_MAX_LEN
import filodb.query.{QueryResult => FiloQueryResult, _}
import filodb.query.AggregationOperator.Avg
import filodb.query.Query.qLogger
import filodb.query.exec.{ExecPlan, HistToPromSeriesMapper}

object PrometheusModel {
Expand Down Expand Up @@ -92,13 +92,20 @@ object PrometheusModel {
qr.result.map(toHistResult(_, verbose, qr.resultType))
else
qr.result.map(toPromResult(_, verbose, qr.resultType))
SuccessResponse(
Data(toPromResultType(qr.resultType), results.filter(r => r.values.nonEmpty || r.value.isDefined)),
qLogger.info(s"dddddddddddd ${qr} qr.resultType ${qr.resultType}, result ${qr.result}")
val response = SuccessResponse(
if (qr.resultType != QueryResultType.Scalar) {
NonScalarData(toPromResultType(qr.resultType), results.filter(r => r.values.nonEmpty || r.value.isDefined))
} else {
ScalarData(toPromResultType(qr.resultType), results.head.value.get)
},
"success",
Some(qr.mayBePartial), qr.partialResultReason,
Some(toQueryStatistics(qr.queryStats)),
Some(toQueryWarningsResponse(qr.warnings))
)
qLogger.info(s"mmmmmmmmm ${response}")
response
}

def toPromExplainPlanResponse(ex: ExecPlan): ExplainPlanResponse = {
Expand Down Expand Up @@ -221,6 +228,7 @@ object PrometheusModel {
}

def toPromErrorResponse(qe: filodb.query.QueryError): ErrorResponse = {
qLogger.info(s"$qe ((((((((())")
ErrorResponse(qe.t.getClass.getSimpleName, qe.t.getMessage, "error", Some(toQueryStatistics(qe.queryStats)))
}

Expand Down
54 changes: 54 additions & 0 deletions query/src/main/scala/filodb/query/PromCirceSupport.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filodb.query

import io.circe.{Decoder, DecodingFailure, Encoder, HCursor, Json}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.syntax._

import filodb.query.AggregationOperator.Avg
Expand Down Expand Up @@ -165,4 +166,57 @@ object PromCirceSupport {
}
}
}


implicit val aggregateResponseEncoder: Encoder[AggregateResponse] = deriveEncoder[AggregateResponse]
implicit val aggregateSamplEncoder: Encoder[AggregateSampl] = deriveEncoder[AggregateSampl]
implicit val resultEncoder: Encoder[Result] = deriveEncoder[Result]
implicit val scalaEncoder: Encoder[ScalarData] = deriveEncoder[ScalarData]
implicit val nonScalaEncoder: Encoder[NonScalarData] = deriveEncoder[NonScalarData]

implicit val dataEncoder: Encoder[Data] = Encoder.instance {
data => data.resultType match {
case "scalar" =>
Json.obj(
("resultType", Json.fromString(data.resultType)),
("result", data.asInstanceOf[ScalarData].dataSampl.asJson)
)
case _ =>
Json.obj(
("resultType", Json.fromString(data.resultType)),
("result", data.asInstanceOf[NonScalarData].result.asJson)
)
}
// data =>
// Json.obj(
// ("resultType", Json.fromString(data.resultType)),
// ("result", data.asInstanceOf[NonScalarData].asJson)
// )
}

implicit val queryStatisticsEncoder: Encoder[QueryStatistics] = deriveEncoder[QueryStatistics]
implicit val queryWarningsResponseEncoder: Encoder[QueryWarningsResponse] = deriveEncoder[QueryWarningsResponse]

implicit val successResponseEncoder: Encoder[SuccessResponse] = deriveEncoder[SuccessResponse]

// implicit val aggregateResponseDecoder: Decoder[AggregateResponse] = deriveDecoder[AggregateResponse]
// implicit val aggregateResponseDecoderateSamplDecoder: Decoder[AggregateSampl] = deriveDecoder[AggregateSampl]
implicit val resultDecoder: Decoder[Result]= deriveDecoder[Result]


implicit val queryWarningsDesponseEncoder: Decoder[QueryWarningsResponse] = deriveDecoder[QueryWarningsResponse]
implicit val successResponseDecoder: Decoder[SuccessResponse] = deriveDecoder[SuccessResponse]

implicit val dataDecoder: Decoder[Data] = (c: HCursor) => {
for {
resultType <- c.downField("resultType").as[String]
data <- resultType match {
case "scalar" => c.downField("result").as[DataSampl].map(s => ScalarData(resultType, s))
case _ => c.downField("result").as[Seq[Result]].map(s => NonScalarData(resultType,s))
}
} yield {
data
}
}

}
12 changes: 11 additions & 1 deletion query/src/main/scala/filodb/query/PromQueryResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ final case class QueryWarningsResponse(
rawScannedBytes: Long = 0
)

final case class Data(resultType: String, result: Seq[Result])
sealed trait Data {
def resultType: String
def result: Seq[Result]
}
final case class NonScalarData(resultType: String, result: Seq[Result]) extends Data {
}

final case class ScalarData(resultType: String, dataSampl: DataSampl) extends Data {
def result : Seq[Result] = {
Seq(Result(Map(), None, Some(dataSampl), None))
}
}
final case class MetadataSuccessResponse(data: Seq[MetadataSampl],
status: String = "success",
partial: Option[Boolean]= None,
Expand Down
4 changes: 2 additions & 2 deletions query/src/main/scala/filodb/query/ResultTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ sealed trait QueryResponse extends NodeResponse with java.io.Serializable {
final case class QueryError(id: String,
queryStats: QueryStats,
t: Throwable) extends QueryResponse with filodb.core.ErrorResponse {
override def toString: String = s"QueryError id=$id ${t.getClass.getName} ${t.getMessage}\n" +
override def toString: String = s"cccccccc QueryError id=$id ${t.getClass.getName} ${t}\n" +
t.getStackTrace.map(_.toString).mkString("\n")
}

Expand Down Expand Up @@ -60,7 +60,7 @@ final case class QueryResult(id: String,
def resultType: QueryResultType = {
result match {
case Nil => QueryResultType.RangeVectors
case Seq(one) if one.key.labelValues.isEmpty && one.numRows.contains(1) => QueryResultType.Scalar
case Seq(one) if one.key.labelValues.isEmpty && one.isScalar => QueryResultType.Scalar
case many: Seq[RangeVector] => if (many.forall(_.numRows.contains(1))) QueryResultType.InstantVector
else QueryResultType.RangeVectors
}
Expand Down
10 changes: 10 additions & 0 deletions query/src/main/scala/filodb/query/exec/ExecPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ trait ExecPlan extends QueryCommand {
}
}
resultTask.onErrorHandle { case ex: Throwable =>
qLogger.info(s"ppppppppppp exception ${ex}")
QueryError(queryContext.queryId, querySession.queryStats, ex)
}
}
Expand All @@ -452,12 +453,14 @@ trait ExecPlan extends QueryCommand {
// materialize, and limit rows per RV
val execPlanString = queryWithPlanName(queryContext)
val srv = SerializedRangeVector(rv, builder, recordSchema, execPlanString, querySession.queryStats)
qLogger.info(s"svr ${srv} eeeeeeeeeee")
if (rv.outputRange.isEmpty)
qLogger.debug(s"Empty rangevector found. Rv class is: ${rv.getClass.getSimpleName}, " +
s"execPlan is: $execPlanString, execPlan children ${this.children}")
srv
}
.map { srv =>
qLogger.info(s"$srv, ttttt srv.numRowsSerialized ${srv.numRowsSerialized}")
// fail the query instead of limiting range vectors and returning incomplete/inaccurate results
numResultSamples += srv.numRowsSerialized
checkSamplesLimit(numResultSamples, querySession.warnings)
Expand All @@ -484,6 +487,11 @@ trait ExecPlan extends QueryCommand {
span.mark(s"numSrv=${r.size}")
span.mark(s"execute-step2-end-${this.getClass.getSimpleName}")
qLogger.debug(s"Finished query execution pipeline with ${r.size} RVs for $this")
if (resultSchema.isEmpty) {
qLogger.warn(s"result $r has empty schema")
}
qLogger.warn(s"hhhhhhhhhhhhhhhhh result $r $resultSchema")

QueryResult(queryContext.queryId, resultSchema, r, querySession.queryStats, querySession.warnings,
querySession.resultCouldBePartial, querySession.partialResultsReason)
}
Expand All @@ -493,6 +501,7 @@ trait ExecPlan extends QueryCommand {
qResult <- step2(res) }
yield { qResult }
val ret = qresp.onErrorRecover { case NonFatal(ex) =>
qLogger.info(s"ffffffffff ${ex}")
QueryError(queryContext.queryId, querySession.queryStats, ex)
}
qLogger.debug(s"Constructed monix query execution pipeline for $this")
Expand Down Expand Up @@ -682,6 +691,7 @@ abstract class NonLeafExecPlan extends ExecPlan {
plan.dispatcher.dispatch(ExecPlanWithClientParams(plan,
ClientParams(plan.queryContext.plannerParams.queryTimeoutMillis - 1000)), source)
.onErrorHandle { ex: Throwable =>
qLogger.info(s"e00000000000 ex ${ex}")
QueryError(queryContext.queryId, qSession.queryStats, ex)
}
}
Expand Down
2 changes: 2 additions & 0 deletions query/src/main/scala/filodb/query/exec/PromQlRemoteExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import filodb.memory.format.ZeroCopyUTF8String._
import filodb.memory.format.vectors.{CustomBuckets, MutableHistogram}
import filodb.query._
import filodb.query.AggregationOperator.Avg
import filodb.query.Query.qLogger

case class PromQlRemoteExec(queryEndpoint: String,
requestTimeoutMs: Long,
Expand Down Expand Up @@ -52,6 +53,7 @@ case class PromQlRemoteExec(queryEndpoint: String,
remoteExecHttpClient.httpPost(queryEndpoint, requestTimeoutMs,
queryContext.submitTime, getUrlParams(), queryContext.traceInfo)
.map { response =>
qLogger.info(s"4444444444 $response")
// Error response from remote partition is a nested json present in response.body
// as response status code is not 2xx
if (response.body.isLeft) {
Expand Down
3 changes: 2 additions & 1 deletion query/src/main/scala/filodb/query/exec/RemoteExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ class RemoteHttpClient private(asyncHttpClientConfig: AsyncHttpClientConfig)
val queryTimeElapsed = System.currentTimeMillis() - submitTime
val readTimeout = FiniteDuration(httpTimeoutMs - queryTimeElapsed, TimeUnit.MILLISECONDS)
val url = uri"$httpEndpoint"
logger.debug("promQlExec url={} traceInfo={}", url, traceInfo)
asJson[SuccessResponse]
logger.info("nnnnnnnnnnnn promQlExec url={} traceInfo={}", url, traceInfo)
sttp
.headers(traceInfo)
.body(urlParams)
Expand Down
112 changes: 71 additions & 41 deletions query/src/test/scala/filodb/query/PromCirceSupportSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -297,62 +297,92 @@ class PromCirceSupportSpec extends AnyFunSpec with Matchers with ScalaFutures {
}
}

it("should parse matrix response") {
val input =
"""[{
| "status": "success",
| "data": {
| "resultType": "matrix",
| "result": [{
| "metric": {
| "a"
| },
| "aggregateResponse": {
| "aggregateValues": [
| [
| 1601491649,
| "15.186417982460787",
| 5
| ],
| [
| 1601491679,
| "14.891293858511071",
| 6
| ],
| [
| 1601491709,
| "14.843819532173134",
| 7
| ],
| [
| 1601491719,
| "NaN",
| 7
| ]
|
| ],
| "function": "avg"
| }
| }]
| },
| "errorType": null,
| "error": null
|}]""".stripMargin
}

it("should parse aggregateResponse") {
val input = """[{
| "status": "success",
| "data": {
| "resultType": "matrix",
| "result": [{
| "metric": {
|
| "_ws_": "aci-telemetry",
| "_ns_": "test"
| },
| "aggregateResponse": {
| "aggregateValues": [
| [
| 1601491649,
| "15.186417982460787",
| 5
| ],
| [
| 1601491679,
| "14.891293858511071",
| 6
| ],
| [
| 1601491709,
| "14.843819532173134",
| 7
| ],
| [
| 1601491719,
| "NaN",
| 7
| ]
|
| ],
| "function": "avg"
| }
| "value": [
| 1619636156,
| "1.8329092E7"
| ]
| }]
| },
| "errorType": null,
| "error": null
|}]""".stripMargin
val expectedResult =List(AvgSampl(1601491649,15.186417982460787,5),
AvgSampl(1601491679,14.891293858511071,6), AvgSampl(1601491709,14.843819532173134,7), AvgSampl(1601491719,
Double.NaN, 7))
// val expectedResult =List(AvgSampl(1601491649,15.186417982460787,5),
// AvgSampl(1601491679,14.891293858511071,6), AvgSampl(1601491709,14.843819532173134,7), AvgSampl(1601491719,
// Double.NaN, 7))

parser.decode[List[SuccessResponse]](input) match {
case Right(successResponse) => val aggregateResponse = successResponse.head.data.result.head.aggregateResponse.get
aggregateResponse.function shouldEqual("avg")
aggregateResponse.aggregateSampl.map(_.asInstanceOf[AvgSampl]).zip(expectedResult).foreach {
case (res, ex) => if (res.value.isNaN) {
ex.value.isNaN shouldEqual(true)
ex.count shouldEqual(res.count)
ex.timestamp shouldEqual(ex.timestamp)
} else ex shouldEqual(res)
}
case Left(ex) => throw ex
try {
println("5555555")
println(parser.decode[List[SuccessResponse]](input).right.get.head.data)
println("ttttttt")

} catch {
case e: Exception => println(e)
}
// parser.decode[List[SuccessResponse]](input) match {
// case Right(successResponse) => val aggregateResponse = successResponse.head.data.getResult.head.aggregateResponse.get
// aggregateResponse.function shouldEqual("avg")
// aggregateResponse.aggregateSampl.map(_.asInstanceOf[AvgSampl]).zip(expectedResult).foreach {
// case (res, ex) => if (res.value.isNaN) {
// ex.value.isNaN shouldEqual(true)
// ex.count shouldEqual(res.count)
// ex.timestamp shouldEqual(ex.timestamp)
// } else ex shouldEqual(res)
// }
// case Left(ex) => throw ex
// }
}

it("should parse sttdev aggregateResponse") {
Expand Down
Loading

0 comments on commit f8034d7

Please sign in to comment.