Skip to content

Commit

Permalink
Merge integration to master for 0.9.8 release
Browse files Browse the repository at this point in the history
  • Loading branch information
jackson-paul committed Jul 15, 2020
2 parents 7388950 + 22c1da6 commit 6b5f8a1
Show file tree
Hide file tree
Showing 99 changed files with 2,723 additions and 833 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Contribution Guidelines

Thank you for thinking of contributing to FiloDB! We welcome all contributions through Github Pull Requests.
When you create a new PR, please be sure to review the guidelines below. Then create a branch based on `develop` (not `master`).
When you create a new PR, please be sure to review the guidelines below. Then create a branch based on `develop` (not `main`).

## <a name="pullrequest"></a> Pull Requests Guidelines
Pull requests are the only means by which you can contribute to this project, please follow the following steps when submitting pull requests :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,33 +187,35 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
* handle this case.
*/
// scalastyle:off parameter.number
def getChunksByIngestionTimeRange(datasetRef: DatasetRef,
splits: Iterator[ScanSplit],
ingestionTimeStart: Long,
ingestionTimeEnd: Long,
userTimeStart: Long,
endTimeExclusive: Long,
maxChunkTime: Long,
batchSize: Int,
batchTime: FiniteDuration): Observable[Seq[RawPartData]] = {
val partKeys = Observable.fromIterator(splits).flatMap {
def getChunksByIngestionTimeRangeNoAsync(datasetRef: DatasetRef,
splits: Iterator[ScanSplit],
ingestionTimeStart: Long,
ingestionTimeEnd: Long,
userTimeStart: Long,
endTimeExclusive: Long,
maxChunkTime: Long,
batchSize: Int): Iterator[Seq[RawPartData]] = {
val partKeys = splits.flatMap {
case split: CassandraTokenRangeSplit =>
val indexTable = getOrCreateIngestionTimeIndexTable(datasetRef)
logger.debug(s"Querying cassandra for partKeys for split=$split ingestionTimeStart=$ingestionTimeStart " +
s"ingestionTimeEnd=$ingestionTimeEnd")
indexTable.scanPartKeysByIngestionTime(split.tokens, ingestionTimeStart, ingestionTimeEnd)
indexTable.scanPartKeysByIngestionTimeNoAsync(split.tokens, ingestionTimeStart, ingestionTimeEnd)
case split => throw new UnsupportedOperationException(s"Unknown split type $split seen")
}

import filodb.core.Iterators._

val chunksTable = getOrCreateChunkTable(datasetRef)
partKeys.bufferTimedAndCounted(batchTime, batchSize).map { parts =>
partKeys.sliding(batchSize, batchSize).map { parts =>
logger.debug(s"Querying cassandra for chunks from ${parts.size} partitions userTimeStart=$userTimeStart " +
s"endTimeExclusive=$endTimeExclusive maxChunkTime=$maxChunkTime")
// TODO evaluate if we can increase parallelism here. This needs to be tuneable
// based on how much faster downsampling should run, and how much additional read load cassandra can take.
chunksTable.readRawPartitionRangeBB(parts, userTimeStart - maxChunkTime, endTimeExclusive).toIterator().toSeq
// This could be more parallel, but decision was made to control parallelism at one place: In spark (via its
// parallelism configuration. Revisit if needed later.
val batchReadSpan = Kamon.spanBuilder("cassandra-per-batch-data-read-latency").start()
try {
chunksTable.readRawPartitionRangeBBNoAsync(parts, userTimeStart - maxChunkTime, endTimeExclusive)
} finally {
batchReadSpan.finish()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import java.nio.ByteBuffer
import scala.concurrent.{ExecutionContext, Future}

import com.datastax.driver.core.{ConsistencyLevel, ResultSet, Row}
import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
Expand Down Expand Up @@ -101,10 +100,10 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
}
}

def scanPartKeysByIngestionTime(tokens: Seq[(String, String)],
ingestionTimeStart: Long,
ingestionTimeEnd: Long): Observable[ByteBuffer] = {
val it = tokens.iterator.flatMap { case (start, end) =>
def scanPartKeysByIngestionTimeNoAsync(tokens: Seq[(String, String)],
ingestionTimeStart: Long,
ingestionTimeEnd: Long): Iterator[ByteBuffer] = {
tokens.iterator.flatMap { case (start, end) =>
/*
* FIXME conversion of tokens to Long works only for Murmur3Partitioner because it generates
* Long based tokens. If other partitioners are used, this can potentially break.
Expand All @@ -114,10 +113,8 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
end.toLong: java.lang.Long,
ingestionTimeStart: java.lang.Long,
ingestionTimeEnd: java.lang.Long)
session.execute(stmt).iterator.asScala
.map { row => row.getBytes("partition") }
session.execute(stmt).asScala.map { row => row.getBytes("partition") }.toSet.iterator
}
Observable.fromIterator(it).handleObservableErrors
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ sealed class TimeSeriesChunksTable(val dataset: DatasetRef,
} yield rpd
}

/**
* Not an async call - limit number of partitions queried at a time
*/
def readRawPartitionRangeBBNoAsync(partitions: Seq[ByteBuffer],
startTime: Long,
endTimeExclusive: Long): Seq[RawPartData] = {
val query = readChunkRangeCql.bind().setList(0, partitions.asJava, classOf[ByteBuffer])
.setLong(1, chunkID(startTime, 0))
.setLong(2, chunkID(endTimeExclusive, 0))
session.execute(query).iterator().asScala
.map { row => (row.getBytes(0), chunkSetFromRow(row, 1)) }
.sortedGroupBy(_._1)
.map { case (partKeyBuffer, chunkSetIt) =>
RawPartData(partKeyBuffer.array, chunkSetIt.map(_._2).toBuffer)
}.toSeq
}

def scanPartitionsBySplit(tokens: Seq[(String, String)]): Observable[RawPartData] = {

val res: Observable[Future[Iterator[RawPartData]]] = Observable.fromIterable(tokens).map { case (start, end) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import filodb.core._
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.store.{ChunkSet, ChunkSetInfo, ColumnStoreSpec, PartKeyRecord}
import filodb.memory.BinaryRegionLarge
import filodb.memory.format.UnsafeUtils
import filodb.memory.{BinaryRegionLarge, NativeMemoryManager}
import filodb.memory.format.{TupleRowReader, UnsafeUtils}
import filodb.memory.format.ZeroCopyUTF8String._

class CassandraColumnStoreSpec extends ColumnStoreSpec {
Expand All @@ -27,6 +27,21 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
lazy val colStore = new CassandraColumnStore(config, s, session)
lazy val metaStore = new CassandraMetaStore(config.getConfig("cassandra"), session)

val nativeMemoryManager = new NativeMemoryManager(100000000L, Map.empty)
val promDataset = Dataset("prometheus", Schemas.gauge)

// First create the tables in C*
override def beforeAll(): Unit = {
super.beforeAll()
colStore.initialize(promDataset.ref, 1).futureValue
colStore.truncate(promDataset.ref, 1).futureValue
}

override def afterAll(): Unit = {
super.afterAll()
nativeMemoryManager.shutdown()
}

"getScanSplits" should "return splits from Cassandra" in {
// Single split, token_start should equal token_end
val singleSplits = colStore.getScanSplits(dataset.ref).asInstanceOf[Seq[CassandraTokenRangeSplit]]
Expand Down Expand Up @@ -221,7 +236,42 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {

val parts = lz4ColStore.readRawPartitions(dataset.ref, 0.millis.toMillis, partScan).toListL.runAsync.futureValue
parts should have length (1)
parts(0).chunkSets should have length (1)
parts(0).chunkSets(0).vectors.toSeq shouldEqual sourceChunks.head.chunks
parts(0).chunkSetsTimeOrdered should have length (1)
parts(0).chunkSetsTimeOrdered(0).vectors.toSeq shouldEqual sourceChunks.head.chunks
}

"getChunksByIngestionTimeRangeNoAsync" should "batch partitions properly" in {

val gaugeName = "my_gauge"
val seriesTags = Map("_ws_".utf8 -> "my_ws".utf8)
val firstSampleTime = 74373042000L
val partBuilder = new RecordBuilder(nativeMemoryManager)
val ingestTime = 1594130687316L
val chunksets = for {
i <- 0 until 1050
partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags)
c <- 0 until 3
} yield {
val rows = Seq(TupleRowReader((Some(firstSampleTime + c), Some(0.0d))))
ChunkSet(Schemas.gauge.data, partKey, ingestTime, rows, nativeMemoryManager)
}
colStore.write(promDataset.ref, Observable.fromIterable(chunksets)).futureValue

val batches = colStore.getChunksByIngestionTimeRangeNoAsync(
promDataset.ref,
colStore.getScanSplits(promDataset.ref).iterator,
ingestTime - 1,
ingestTime + 1,
firstSampleTime - 1,
firstSampleTime + 5,
10L,
100
).toList

batches.size shouldEqual 11 // 100 rows per batch, 1050 rows => 11 batches
batches.zipWithIndex.foreach { case (b, i) =>
b.size shouldEqual (if (i == 10) 50 else 100)
b.foreach(_.chunkSetsTimeOrdered.size shouldEqual 3)
}
}
}
195 changes: 195 additions & 0 deletions cassandra/src/test/scala/filodb.cassandra/columnstore/OdpSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package filodb.cassandra.columnstore

import scala.concurrent.Future

import com.typesafe.config.ConfigFactory
import monix.execution.Scheduler
import monix.reactive.Observable
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}

import filodb.cassandra.DefaultFiloSessionProvider
import filodb.core.{MachineMetricsData, TestData}
import filodb.core.binaryrecord2.{BinaryRecordRowReader, RecordBuilder}
import filodb.core.downsample.OffHeapMemory
import filodb.core.memstore._
import filodb.core.memstore.FiloSchedulers.QuerySchedName
import filodb.core.metadata.{Dataset, Schemas}
import filodb.core.query.{ColumnFilter, QueryConfig, QueryContext, QuerySession}
import filodb.core.query.Filter.Equals
import filodb.core.store.{InMemoryMetaStore, PartKeyRecord, StoreConfig, TimeRangeChunkScan}
import filodb.memory.format.ZeroCopyUTF8String._
import filodb.query.{QueryResponse, QueryResult}
import filodb.query.exec.{InProcessPlanDispatcher, MultiSchemaPartitionsExec}

class OdpSpec extends FunSpec with Matchers with BeforeAndAfterAll with ScalaFutures {

implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(250, Millis))

val config = ConfigFactory.load("application_test.conf").getConfig("filodb")

implicit val s = monix.execution.Scheduler.Implicits.global
lazy val session = new DefaultFiloSessionProvider(config.getConfig("cassandra")).session
lazy val colStore = new CassandraColumnStore(config, s, session)

val rawDataStoreConfig = StoreConfig(ConfigFactory.parseString( """
|flush-interval = 1h
|shard-mem-size = 1MB
|ingestion-buffer-mem-size = 30MB
""".stripMargin))

val offheapMem = new OffHeapMemory(Seq(Schemas.gauge),
Map.empty, 100, rawDataStoreConfig)
val schemas = Schemas(Schemas.gauge)

val dataset = Dataset("prometheus", Schemas.gauge)
val gaugeName = "my_gauge"
var gaugePartKeyBytes: Array[Byte] = _
val seriesTags = Map("_ws_".utf8 -> "my_ws".utf8, "_ns_".utf8 -> "my_ns".utf8)
val shardStats = new TimeSeriesShardStats(dataset.ref, -1)

val firstSampleTime = 74373042000L
val numSamples = 100
val queryScheduler = Scheduler.fixedPool(s"$QuerySchedName", 3)

// First create the tables in C*
override def beforeAll(): Unit = {
super.beforeAll()
colStore.initialize(dataset.ref, 1).futureValue
colStore.truncate(dataset.ref, 1).futureValue
}

override def afterAll(): Unit = {
super.afterAll()
queryScheduler.shutdown()
offheapMem.free()
}

it ("should write gauge data to cassandra") {
val partBuilder = new RecordBuilder(offheapMem.nativeMemoryManager)
val partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName, seriesTags)

val part = new TimeSeriesPartition(0, Schemas.gauge, partKey,
0, offheapMem.bufferPools(Schemas.gauge.schemaHash), shardStats,
offheapMem.nativeMemoryManager, 1)

gaugePartKeyBytes = part.partKeyBytes

val rawSamples = Stream.from(0).map { i =>
Seq(firstSampleTime + i, i.toDouble, gaugeName, seriesTags)
}.take(numSamples)

MachineMetricsData.records(dataset, rawSamples).records.foreach { case (base, offset) =>
val rr = new BinaryRecordRowReader(Schemas.gauge.ingestionSchema, base, offset)
part.ingest( System.currentTimeMillis(), rr, offheapMem.blockMemFactory)
part.switchBuffers(offheapMem.blockMemFactory, true)
}
val chunks = part.makeFlushChunks(offheapMem.blockMemFactory)

colStore.write(dataset.ref, Observable.fromIterator(chunks)).futureValue
val pk = PartKeyRecord(gaugePartKeyBytes, firstSampleTime, firstSampleTime + numSamples, Some(150))
colStore.writePartKeys(dataset.ref, 0, Observable.now(pk), 259200, 34).futureValue
}

it ("should be able to do full ODP for non concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

val rvs = query(memStore).futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples
} finally {
memStore.shutdown()
}
}

it ("should be able to do full ODP for concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

// issue 2 concurrent queries
val res = (0 to 1).map(_ => query(memStore))

// all queries should result in all chunks
res.foreach { r =>
val rvs = r.futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples
}
} finally {
memStore.shutdown()
}
}

it ("should be able to do partial ODP for non concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

// ingrest some more samples to trigger partial odp
val rawSamples = Stream.from(0).map { i =>
Seq(firstSampleTime + numSamples + i, i.toDouble, gaugeName, seriesTags)
}.take(numSamples)

memStore.ingest(dataset.ref, 0, SomeData(MachineMetricsData.records(dataset, rawSamples).records, 300))

val rvs = query(memStore).futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples * 2
} finally {
memStore.shutdown()
}
}

it ("should be able to do partial ODP for concurrent queries") {
val policy = new FixedMaxPartitionsEvictionPolicy(20)
val memStore = new TimeSeriesMemStore(config, colStore, new InMemoryMetaStore(), Some(policy))
try {
memStore.setup(dataset.ref, schemas, 0, TestData.storeConf)
memStore.recoverIndex(dataset.ref, 0).futureValue
memStore.refreshIndexForTesting(dataset.ref)

// ingrest some more samples to trigger partial odp
val rawSamples = Stream.from(0).map { i =>
Seq(firstSampleTime + numSamples + i, i.toDouble, gaugeName, seriesTags)
}.take(numSamples)

memStore.ingest(dataset.ref, 0, SomeData(MachineMetricsData.records(dataset, rawSamples).records, 300))

// issue 2 concurrent queries
val res = (0 to 1).map(_ => query(memStore))

// all queries should result in all chunks
res.foreach { r =>
val rvs = r.futureValue.asInstanceOf[QueryResult]
rvs.result.size shouldEqual 1
rvs.result.head.rows.toList.size shouldEqual numSamples * 2
}
} finally {
memStore.shutdown()
}
}

def query(memStore: TimeSeriesMemStore): Future[QueryResponse] = {
val colFilters = seriesTags.map { case (t, v) => ColumnFilter(t.toString, Equals(v.toString)) }.toSeq
val queryFilters = colFilters :+ ColumnFilter("_metric_", Equals(gaugeName))
val exec = MultiSchemaPartitionsExec(QueryContext(sampleLimit = numSamples * 2), InProcessPlanDispatcher,
dataset.ref, 0, queryFilters, TimeRangeChunkScan(firstSampleTime, firstSampleTime + 2 * numSamples))
val queryConfig = new QueryConfig(config.getConfig("query"))
val querySession = QuerySession(QueryContext(), queryConfig)
exec.execute(memStore, querySession)(queryScheduler).runAsync(queryScheduler)
}
}

Loading

0 comments on commit 6b5f8a1

Please sign in to comment.