Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch 'develop' into 0.9.25.integration #1719

Merged
merged 19 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0a91fd8
misc(query): limit partKey/partId lookup results early (#1692)
sherali42 Dec 8, 2023
d419d6b
fix(cardinality): Records with overflow doesn't pass dataset + Aggreg…
sandeep6189 Dec 19, 2023
22e83d0
fix unary expressions. (#1697)
yu-shipit Dec 20, 2023
7daca4d
feat(query) Support split partition raw queries (#1677)
amolnayak311 Jan 6, 2024
95fc796
fix(coord, query): Streaming exec fixes to prevent double subscribe o…
vishramachandran Jan 16, 2024
6e43546
fix(query) Fixes for split partition queries (#1701)
amolnayak311 Jan 22, 2024
8d1ae34
fix(core): Add histogram bucket factor in samples queried metric (#1708)
vishramachandran Feb 1, 2024
43fb783
fix(query): metric name ColumnFilter should replace `_bucket` only wh…
sandeep6189 Feb 1, 2024
63e7dfb
Removing Cardinality V1 API Code and Defaulting to V2 API (#1703)
sandeep6189 Feb 1, 2024
618eae0
perf(query) Option to disable Lucene caching (#1709)
amolnayak311 Feb 2, 2024
32e4eeb
feat(query) Implement Protobuf definitions of all of the execution pl…
kvpetrov Feb 5, 2024
9659b2a
Setting writes to idempotent and adding debug log for shard recovery …
sandeep6189 Feb 8, 2024
8f7933f
perf(query): reduce regex query fanout (#1704)
alextheimer Feb 9, 2024
aecfbeb
fix(query): ShardKeyRegexPlanner batching fix (#1716)
alextheimer Feb 15, 2024
7aa80af
fix(query): histogram queries with PeriodSeriesWithWindowing logical …
sandeep6189 Feb 16, 2024
feb8a6e
feat(histogram): Adding support for min max for prom and delta histog…
sandeep6189 Feb 20, 2024
506d888
Merge branch 'develop' into integration
Feb 20, 2024
77c4c1f
Update version to 0.9.25
Feb 20, 2024
dcf2566
Removing duplicate test as part of the merge
Feb 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
s"INSERT INTO $tableString (partition, ingestion_time, start_time, info) " +
s"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val deleteIndexCql = session.prepare(
s"DELETE FROM $tableString WHERE partition=? AND ingestion_time=? AND start_time=?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ sealed class PartitionKeysByUpdateTimeTable(val dataset: DatasetRef,
s"INSERT INTO $tableString (shard, epochHour, split, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val readCql = session.prepare(
s"SELECT * FROM $tableString " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ sealed class PartitionKeysTable(val dataset: DatasetRef,
s"INSERT INTO ${tableString} (partKey, startTime, endTime) " +
s"VALUES (?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val writePartitionCqlNoTtl = session.prepare(
s"INSERT INTO ${tableString} (partKey, startTime, endTime) " +
s"VALUES (?, ?, ?)")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val scanCql = session.prepare(
s"SELECT * FROM $tableString " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ sealed class PartitionKeysV2Table(val dataset: DatasetRef,
s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val writePartitionCqlNoTtl = session.prepare(
s"INSERT INTO ${tableString} (shard, bucket, partKey, startTime, endTime) " +
s"VALUES (?, ?, ?, ?, ?)")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val scanCql = session.prepare(
s"SELECT partKey, startTime, endTime, shard FROM $tableString " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
s"INSERT INTO $tableString (partition, chunkid, info, chunks) " +
s"VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)
.setIdempotent(true)

private lazy val deleteChunksCql = session.prepare(
s"DELETE FROM $tableString WHERE partition=? AND chunkid IN ?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl
def dispatch(plan: ExecPlanWithClientParams, source: ChunkSource)(implicit sched: Scheduler): Task[QueryResponse] = {
// "source" is unused (the param exists to support InProcessDispatcher).
val queryTimeElapsed = System.currentTimeMillis() - plan.execPlan.queryContext.submitTime
val remainingTime = plan.clientParams.deadline - queryTimeElapsed
val remainingTime = plan.clientParams.deadlineMs - queryTimeElapsed
lazy val emptyPartialResult: QueryResult = QueryResult(plan.execPlan.queryContext.queryId, ResultSchema.empty, Nil,
QueryStats(), QueryWarnings(), true, Some("Result may be partial since query on some shards timed out"))

Expand Down Expand Up @@ -66,7 +66,7 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl
(implicit sched: Scheduler): Observable[StreamQueryResponse] = {
// "source" is unused (the param exists to support InProcessDispatcher).
val queryTimeElapsed = System.currentTimeMillis() - plan.execPlan.queryContext.submitTime
val remainingTime = plan.clientParams.deadline - queryTimeElapsed
val remainingTime = plan.clientParams.deadlineMs - queryTimeElapsed
lazy val emptyPartialResult = StreamQueryResultFooter(plan.execPlan.queryContext.queryId, plan.execPlan.planId,
QueryStats(), QueryWarnings(), true, Some("Result may be partial since query on some shards timed out"))

Expand All @@ -82,12 +82,12 @@ case class ActorPlanDispatcher(target: ActorRef, clusterName: String) extends Pl
})
} else {
ResultActor.subject
.doOnSubscribe(Task.eval {
target.tell(plan.execPlan, ResultActor.resultActor)
qLogger.debug(s"Sent to $target the plan ${plan.execPlan}")
})
.filter(_.planId == plan.execPlan.planId)
.takeWhileInclusive(!_.isLast)
.doOnSubscribe(Task.eval {
target.tell(plan.execPlan, ResultActor.resultActor)
qLogger.debug(s"DISPATCHING ${plan.execPlan.planId}")
})
.filter(_.planId == plan.execPlan.planId)
.takeWhileInclusive(!_.isLast)
// TODO timeout query if response stream not completed in time
}
}
Expand Down
21 changes: 14 additions & 7 deletions coordinator/src/main/scala/filodb.coordinator/QueryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import filodb.core.query.QuerySession
import filodb.core.query.QueryStats
import filodb.core.query.SerializedRangeVector
import filodb.core.store.CorruptVectorException
import filodb.grpc.ExecPlans.ExecPlanContainer
import filodb.query._
import filodb.query.exec.{ExecPlan, InProcessPlanDispatcher, PlanDispatcher}
import filodb.query.exec.{ExecPlan, InProcessPlanDispatcher}

object QueryActor {
final case class ThrowException(dataset: DatasetRef)
Expand Down Expand Up @@ -87,9 +88,9 @@ final class QueryActor(memStore: TimeSeriesStore,
private val lpRequests = Kamon.counter("queryactor-logicalPlan-requests").withTags(TagSet.from(tags))
private val epRequests = Kamon.counter("queryactor-execplan-requests").withTags(TagSet.from(tags))
private val queryErrors = Kamon.counter("queryactor-query-errors").withTags(TagSet.from(tags))
private val uncaughtExceptions = Kamon.counter("queryactor-uncaught-exceptions").withTags(TagSet.from(tags))
private val numRejectedPlans = Kamon.counter("circuit-breaker-num-rejected-plans").withTags(TagSet.from(tags))

private val streamResultsEnabled = config.getBoolean("filodb.query.streaming-query-results-enabled")
private val circuitBreakerEnabled = config.getBoolean("filodb.query.circuit-breaker.enabled")
private val circuitBreakerNumFailures = config.getInt("filodb.query.circuit-breaker.open-when-num-failures")
private val circuitBreakerResetTimeout = config.as[FiniteDuration]("filodb.query.circuit-breaker.reset-timeout")
Expand Down Expand Up @@ -117,7 +118,7 @@ final class QueryActor(memStore: TimeSeriesStore,
queryExecuteSpan.tag("query-id", q.queryContext.queryId)
val querySession = QuerySession(q.queryContext,
queryConfig,
streamingDispatch = PlanDispatcher.streamingResultsEnabled,
streamingDispatch = streamResultsEnabled,
catchMultipleLockSetErrors = true)
queryExecuteSpan.mark("query-actor-received-execute-start")

Expand Down Expand Up @@ -218,7 +219,7 @@ final class QueryActor(memStore: TimeSeriesStore,
lpRequests.increment()
try {
val execPlan = queryPlanner.materialize(q.logicalPlan, q.qContext)
if (PlanDispatcher.streamingResultsEnabled) {
if (streamResultsEnabled) {
val res = queryPlanner.dispatchStreamingExecPlan(execPlan, Kamon.currentSpan())(queryScheduler, 30.seconds)
queryengine.Utils.streamToFatQueryResponse(q.qContext, res).runToFuture(queryScheduler).onComplete {
case Success(resp) => replyTo ! resp
Expand All @@ -236,8 +237,6 @@ final class QueryActor(memStore: TimeSeriesStore,
}
}



private def processExplainPlanQuery(q: ExplainPlan2Query, replyTo: ActorRef): Unit = {
if (checkTimeoutBeforeQueryExec(q.qContext, replyTo)) {
try {
Expand Down Expand Up @@ -293,12 +292,20 @@ final class QueryActor(memStore: TimeSeriesStore,
}
}

def execProtoExecPlan(pep: ProtoExecPlan, replyTo: ActorRef): Unit = {
import filodb.coordinator.ProtoConverters._
val c = ExecPlanContainer.parseFrom(pep.serializedExecPlan)
val plan: ExecPlan = c.fromProto()
execPhysicalPlan2(plan, replyTo)
}

def receive: Receive = {
case q: LogicalPlan2Query => val replyTo = sender()
processLogicalPlan2Query(q, replyTo)
case q: ExplainPlan2Query => val replyTo = sender()
processExplainPlanQuery(q, replyTo)
case q: ExecPlan => execPhysicalPlan2(q, sender())
case q: ExecPlan => execPhysicalPlan2(q, sender())
case q: ProtoExecPlan => execProtoExecPlan(q, sender())
case q: GetTopkCardinality => execTopkCardinalityQuery(q, sender())

case GetIndexNames(ref, limit, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import filodb.coordinator.ActorSystemHolder.system
import filodb.query.Query.qLogger
import filodb.query.StreamQueryResponse


object ResultActor {
def props(subject: ConcurrentSubject[StreamQueryResponse, StreamQueryResponse]): Props =
def props(subject: ConcurrentSubject[StreamQueryResponse, StreamQueryResponse]): Props = {
Props(classOf[ResultActor], subject)
lazy val subject = ConcurrentSubject[StreamQueryResponse](MulticastStrategy.Publish)(QueryScheduler.queryScheduler)
}

lazy val subject = ConcurrentSubject[StreamQueryResponse](MulticastStrategy.publish)(QueryScheduler.queryScheduler)
lazy val resultActor = system.actorOf(Props(new ResultActor(subject)))
}

Expand All @@ -22,7 +23,7 @@ class ResultActor(subject: ConcurrentSubject[StreamQueryResponse, StreamQueryRes
case q: StreamQueryResponse =>
try {
subject.onNext(q)
qLogger.debug(s"Result Actor got ${q.getClass} as response from ${sender()}")
qLogger.debug(s"Result Actor got ${q.getClass.getSimpleName} for plan ${q.planId}")
} catch {
case e: Throwable =>
qLogger.error(s"Exception when processing $q", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class TenantIngestionMetering(settings: FilodbSettings,
dsIterProducer().foreach { dsRef =>
val fut = Client.asyncAsk(
coordActorProducer(),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields, 2, overrideClusterName = CLUSTER_TYPE)),
LogicalPlan2Query(dsRef, TsCardinalities(prefix, numGroupByFields, overrideClusterName = CLUSTER_TYPE)),
ASK_TIMEOUT)
fut.onComplete {
case Success(QueryResult(_, _, rv, _, _, _, _)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ object QueryCommands {
logicalPlan: LogicalPlan2,
qContext: QueryContext = QueryContext(),
submitTime: Long = System.currentTimeMillis()) extends QueryCommand

final case class ProtoExecPlan(dataset: DatasetRef,
serializedExecPlan: Array[Byte],
submitTime: Long = System.currentTimeMillis()) extends QueryCommand
// Error responses from query
final case class UndefinedColumns(undefined: Set[String]) extends ErrorResponse
final case class BadArgument(msg: String) extends ErrorResponse with QueryResponse
Expand Down
Loading
Loading