Skip to content

Commit

Permalink
Release 0.9.8 Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
tjackpaul authored Jul 7, 2020
2 parents 462933e + a4a16cc commit 5e91a44
Show file tree
Hide file tree
Showing 92 changed files with 2,454 additions and 786 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Contribution Guidelines

Thank you for thinking of contributing to FiloDB! We welcome all contributions through Github Pull Requests.
When you create a new PR, please be sure to review the guidelines below. Then create a branch based on `develop` (not `master`).
When you create a new PR, please be sure to review the guidelines below. Then create a branch based on `develop` (not `main`).

## <a name="pullrequest"></a> Pull Requests Guidelines
Pull requests are the only means by which you can contribute to this project, please follow the following steps when submitting pull requests :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {

val parts = lz4ColStore.readRawPartitions(dataset.ref, 0.millis.toMillis, partScan).toListL.runAsync.futureValue
parts should have length (1)
parts(0).chunkSets should have length (1)
parts(0).chunkSets(0).vectors.toSeq shouldEqual sourceChunks.head.chunks
parts(0).chunkSetsTimeOrdered should have length (1)
parts(0).chunkSetsTimeOrdered(0).vectors.toSeq shouldEqual sourceChunks.head.chunks
}
}
195 changes: 195 additions & 0 deletions cassandra/src/test/scala/filodb.cassandra/columnstore/OdpSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package filodb.cassandra.columnstore

import scala.concurrent.Future

import com.typesafe.config.ConfigFactory
import monix.execution.Scheduler
import monix.reactive.Observable
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}

import filodb.cassandra.DefaultFiloSessionProvider
import filodb.core.{MachineMetricsData, TestData}
import filodb.core.binaryrecord2.{BinaryRecordRowReader, RecordBuilder}
import filodb.core.downsample.OffHeapMemory
import filodb.core.memstore._
import filodb.core.memstore.FiloSchedulers.QuerySchedName
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.query.{ColumnFilter, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.Filter.Equals
import filodb.core.store.{InMemoryMetaStore, PartKeyRecord, StoreConfig, TimeRangeChunkScan}
import filodb.memory.format.ZeroCopyUTF8String._
import filodb.query.{QueryResponse, QueryResult}
import filodb.query.exec.{InProcessPlanDispatcher, MultiSchemaPartitionsExec}

class OdpSpec extends FunSpec with Matchers with BeforeAndAfterAll with ScalaFutures {

implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(250, Millis))

val config = ConfigFactory.load("application_test.conf").getConfig("filodb")

implicit val s = monix.execution.Scheduler.Implicits.global
lazy val session = new DefaultFiloSessionProvider(config.getConfig("cassandra")).session
lazy val colStore = new CassandraColumnStore(config, s, session)

val rawDataStoreConfig = StoreConfig(ConfigFactory.parseString( """
|flush-interval = 1h
|shard-mem-size = 1MB
|ingestion-buffer-mem-size = 30MB
""".stripMargin))

val offheapMem = new OffHeapMemory(Seq(Schemas.gauge),
Map.empty, 100, rawDataStoreConfig)
val schemas = Schemas(Schemas.gauge)

val dataset = Dataset("prometheus", Schemas.gauge)
val gaugeName = "my_gauge"
var gaugePartKeyBytes: Array[Byte] = _
val seriesTags = Map("_ws_".utf8 -> "my_ws".utf8, "_ns_".utf8 -> "my_ns".utf8)
val shardStats = new TimeSeriesShardStats(dataset.ref, -1)

val firstSampleTime = 74373042000L
val numSamples = 100
val queryScheduler = Scheduler.fixedPool(s"$QuerySchedName", 3)

// First create the tables in C*
override def beforeAll(): Unit = {
super.beforeAll()
colStore.initialize(dataset.ref, 1).futureValue
colStore.truncate(dataset.ref, 1).futureValue
}

override def afterAll(): Unit = {
super.afterAll()
queryScheduler.shutdown()
offheapMem.free()
}

it ("should write gauge data to cassandra") {
val partBuilder = new RecordBuilder(offheapMem.nativeMemoryManager)
val partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName, seriesTags)

val part = new TimeSeriesPartition(0, Schemas.gauge, partKey,
0, offheapMem.bufferPools(Schemas.gauge.schemaHash), shardStats,
offheapMem.nativeMemoryManager, 1)

gaugePartKeyBytes = part.partKeyBytes

val rawSamples = Stream.from(0).map { i =>
Seq(firstSampleTime + i, i.toDouble, gaugeName, seriesTags)
}.take(numSamples)

MachineMetricsData.records(dataset, rawSamples).records.foreach { case (base, offset) =>
val rr = new BinaryRecordRowReader(Schemas.gauge.ingestionSchema, base, offset)
part.ingest( System.currentTimeMillis(), rr, offheapMem.blockMemFactory)
part.switchBuffers(offheapMem.blockMemFactory, true)
}
val chunks = part.makeFlushChunks(offheapMem.blockMemFactory)

colStore.write(dataset.ref, Observable.fromIterator(chunks)).futureValue
val pk = PartKeyRecord(gaugePartKeyBytes, firstSampleTime, firstSampleTime + numSamples, Some(150))
colStore.writePartKeys(dataset.ref, 0, Observable.now(pk), 259200, 34).futureValue
}

it ("should be able to do full ODP for non concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

val rvs = query(memStore).futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples
} finally {
memStore.shutdown()
}
}

it ("should be able to do full ODP for concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

// issue 2 concurrent queries
val res = (0 to 1).map(_ => query(memStore))

// all queries should result in all chunks
res.foreach { r =>
val rvs = r.futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples
}
} finally {
memStore.shutdown()
}
}

it ("should be able to do partial ODP for non concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

// ingrest some more samples to trigger partial odp
val rawSamples = Stream.from(0).map { i =>
Seq(firstSampleTime + numSamples + i, i.toDouble, gaugeName, seriesTags)
}.take(numSamples)

memStore.ingest(dataset.ref, 0, SomeData(MachineMetricsData.records(dataset, rawSamples).records, 300))

val rvs = query(memStore).futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples * 2
} finally {
memStore.shutdown()
}
}

it ("should be able to do partial ODP for concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

// ingrest some more samples to trigger partial odp
val rawSamples = Stream.from(0).map { i =>
Seq(firstSampleTime + numSamples + i, i.toDouble, gaugeName, seriesTags)
}.take(numSamples)

memStore.ingest(dataset.ref, 0, SomeData(MachineMetricsData.records(dataset, rawSamples).records, 300))

// issue 2 concurrent queries
val res = (0 to 1).map(_ => query(memStore))

// all queries should result in all chunks
res.foreach { r =>
val rvs = r.futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples * 2
}
} finally {
memStore.shutdown()
}
}

def query(memStore: TimeSeriesMemStore): Future[QueryResponse] = {
val colFilters = seriesTags.map { case (t, v) => ColumnFilter(t.toString, Equals(v.toString)) }.toSeq
val queryFilters = colFilters :+ ColumnFilter("_metric_", Equals(gaugeName))
val exec = MultiSchemaPartitionsExec(QueryContext(sampleLimit = numSamples * 2), InProcessPlanDispatcher,
dataset.ref, 0, queryFilters, TimeRangeChunkScan(firstSampleTime, firstSampleTime + 2 * numSamples))
val queryConfig = new QueryConfig(config.getConfig("query"))
val querySession = QuerySession(QueryContext(), queryConfig)
exec.execute(memStore, querySession)(queryScheduler).runAsync(queryScheduler)
}
}

11 changes: 6 additions & 5 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import scala.util.Try

import com.opencsv.CSVWriter
import com.quantifind.sumac.{ArgMain, FieldArgs}
import com.typesafe.config.ConfigFactory
import monix.reactive.Observable
import org.scalactic._

Expand All @@ -19,7 +18,7 @@ import filodb.coordinator.client.QueryCommands.StaticSpreadProvider
import filodb.core._
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{Column, Schemas}
import filodb.core.query.{PromQlQueryParams, QueryContext, TsdbQueryParams, UnavailablePromQlQueryParams}
import filodb.core.query._
import filodb.core.store.ChunkSetInfoOnHeap
import filodb.memory.MemFactory
import filodb.memory.format.{BinaryVector, Classes, MemoryReader, RowReader}
Expand Down Expand Up @@ -266,7 +265,9 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
def parseLabelValuesQuery(client: LocalClient, labelNames: Seq[String], constraints: Map[String, String], dataset: String,
timeParams: TimeRangeParams,
options: QOptions): Unit = {
val logicalPlan = LabelValues(labelNames, constraints, 3.days.toMillis)
// TODO support all filters
val filters = constraints.map { case (k, v) => ColumnFilter(k, Filter.Equals(v)) }.toSeq
val logicalPlan = LabelValues(labelNames, filters, timeParams.start * 1000, timeParams.end * 1000)
executeQuery2(client, dataset, logicalPlan, options, UnavailablePromQlQueryParams)
}

Expand All @@ -275,8 +276,8 @@ object CliMain extends ArgMain[Arguments] with FilodbClusterNode {
options: QOptions): Unit = {
val logicalPlan = Parser.queryRangeToLogicalPlan(query, timeParams)
// Routing is not supported with CLI
executeQuery2(client, dataset, logicalPlan, options, PromQlQueryParams(ConfigFactory.empty, query,timeParams.start, timeParams.step,
timeParams.end))
executeQuery2(client, dataset, logicalPlan, options,
PromQlQueryParams(query,timeParams.start, timeParams.step, timeParams.end))
}

def promFilterToPartKeyBr(query: String, schemaName: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package com.esotericsoftware.kryo.io

import com.esotericsoftware.kryo.{Serializer => KryoSerializer}
import com.esotericsoftware.kryo.Kryo
import com.typesafe.config.{ ConfigFactory, ConfigRenderOptions}
import com.typesafe.scalalogging.StrictLogging

import filodb.core.binaryrecord2.{RecordSchema => RecordSchema2}
import filodb.core.query.{ColumnInfo, PartitionInfo, PartitionRangeVectorKey, PromQlQueryParams}
import filodb.core.query.{ColumnInfo, PartitionInfo, PartitionRangeVectorKey}
import filodb.memory.format._

// NOTE: This file has to be in the kryo namespace so we can use the require() method
Expand Down Expand Up @@ -79,26 +78,3 @@ class PartitionInfoSerializer extends KryoSerializer[PartitionInfo] {
output.writeInt(info.shardNo)
}
}

class PromQlQueryParamsSerializer extends KryoSerializer[PromQlQueryParams] {
override def read(kryo: Kryo, input: Input, typ: Class[PromQlQueryParams]): PromQlQueryParams = {
val config = ConfigFactory.parseString(input.readString())
val promQl = input.readString()
val start = input.readLong()
val step = input.readLong()
val end = input.readLong()
val spreadInt = input.readInt()
val spread = if (spreadInt == -1) None else Some(spreadInt)
val procFailure = input.readBoolean()
PromQlQueryParams(config, promQl, start, step, end, spread, procFailure)
}
override def write(kryo: Kryo, output: Output, promParam: PromQlQueryParams): Unit = {
output.writeString(promParam.config.root().render(ConfigRenderOptions.concise()))
output.writeString(promParam.promQl)
output.writeLong(promParam.startSecs)
output.writeLong(promParam.stepSecs)
output.writeLong(promParam.endSecs)
output.writeInt(promParam.spread.getOrElse(-1))
output.writeBoolean(promParam.processFailure)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class KryoInit {
kryo.register(classOf[QueryCommands.BadQuery])
kryo.register(classOf[QueryContext])
kryo.register(classOf[QueryCommands.FilteredPartitionQuery])
kryo.register(classOf[PromQlQueryParams], new PromQlQueryParamsSerializer)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.typesafe.scalalogging.StrictLogging
import filodb.core.DatasetRef
import filodb.core.query.{PromQlQueryParams, QueryConfig, QueryContext}
import filodb.query.LogicalPlan
import filodb.query.exec.{ExecPlan, InProcessPlanDispatcher, PromQlExec, StitchRvsExec}
import filodb.query.exec.{ExecPlan, InProcessPlanDispatcher, PromQlRemoteExec, StitchRvsExec}

/**
* HighAvailabilityPlanner responsible for using underlying local planner and FailureProvider
Expand All @@ -23,9 +23,15 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
failureProvider: FailureProvider,
queryConfig: QueryConfig) extends QueryPlanner with StrictLogging {

import net.ceedubs.ficus.Ficus._
import LogicalPlanUtils._
import QueryFailureRoutingStrategy._

val remoteHttpEndpoint: String = queryConfig.routingConfig.getString("remote.http.endpoint")

val remoteHttpTimeoutMs: Long =
queryConfig.routingConfig.config.as[Option[Long]]("remote.http.timeout").getOrElse(60000)

/**
* Converts Route objects returned by FailureProvider to ExecPlan
*/
Expand All @@ -43,24 +49,25 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
// Offset logic is handled in ExecPlan
localPlanner.materialize(
copyWithUpdatedTimeRange(rootLogicalPlan, TimeRange(timeRange.startMs + offsetMs,
timeRange.endMs + offsetMs) , lookBackTime), qContext)
timeRange.endMs + offsetMs)), qContext)
}
case route: RemoteRoute =>
val timeRange = route.timeRange.get
val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
// Divide by 1000 to convert millis to seconds. PromQL params are in seconds.
val promQlParams = PromQlQueryParams(queryConfig.routingConfig, queryParams.promQl,
val promQlParams = PromQlQueryParams(queryParams.promQl,
(timeRange.startMs + offsetMs) / 1000, queryParams.stepSecs, (timeRange.endMs + offsetMs) / 1000,
queryParams.spread, processFailure = false)
logger.debug("PromQlExec params:" + promQlParams)
PromQlExec(qContext, InProcessPlanDispatcher, dsRef, promQlParams)
PromQlRemoteExec(remoteHttpEndpoint, remoteHttpTimeoutMs,
qContext, InProcessPlanDispatcher, dsRef, promQlParams)
}
}

if (execPlans.size == 1) execPlans.head
else StitchRvsExec(qContext,
InProcessPlanDispatcher,
execPlans.sortWith((x, y) => !x.isInstanceOf[PromQlExec]))
execPlans.sortWith((x, y) => !x.isInstanceOf[PromQlRemoteExec]))
// ^^ Stitch RemoteExec plan results with local using InProcessPlanDispatcher
// Sort to move RemoteExec in end as it does not have schema

Expand All @@ -70,7 +77,7 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,

// lazy because we want to fetch failures only if needed
lazy val offsetMillis = LogicalPlanUtils.getOffsetMillis(logicalPlan)
lazy val periodicSeriesTime = getPeriodicSeriesTimeFromLogicalPlan(logicalPlan)
lazy val periodicSeriesTime = getTimeFromLogicalPlan(logicalPlan)
lazy val periodicSeriesTimeWithOffset = TimeRange(periodicSeriesTime.startMs - offsetMillis,
periodicSeriesTime.endMs - offsetMillis)
lazy val lookBackTime = getLookBackMillis(logicalPlan)
Expand All @@ -84,7 +91,8 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
if (!logicalPlan.isRoutable ||
!tsdbQueryParams.isInstanceOf[PromQlQueryParams] || // We don't know the promql issued (unusual)
(tsdbQueryParams.isInstanceOf[PromQlQueryParams]
&& !tsdbQueryParams.asInstanceOf[PromQlQueryParams].processFailure) || // This is a query that was part of
&& !tsdbQueryParams.asInstanceOf[PromQlQueryParams].processFailure) || // This is a query that was
// part of failure routing
!hasSingleTimeRange(logicalPlan) || // Sub queries have different time ranges (unusual)
failures.isEmpty) { // no failures in query time range
localPlanner.materialize(logicalPlan, qContext)
Expand Down
Loading

0 comments on commit 5e91a44

Please sign in to comment.