Skip to content

Commit

Permalink
Merge pull request #302 from tjackpaul/integration
Browse files Browse the repository at this point in the history
Merge Develop to 0.8.4-integration
  • Loading branch information
sherali42 authored Mar 28, 2019
2 parents 3f331b9 + fbb958c commit f9b2c77
Show file tree
Hide file tree
Showing 147 changed files with 5,213 additions and 2,317 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ env:
scala:
- 2.11.12
jdk:
- oraclejdk8
- openjdk11

# These directories are cached to S3 at the end of the build
cache:
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ and [filodb-discuss](https://groups.google.com/forum/#!forum/filodb-discuss) goo
- [Getting Started](#getting-started)
- [End to End Kafka Developer Setup](#end-to-end-kafka-developer-setup)
- [Using the Gateway to stream Application Metrics](#using-the-gateway-to-stream-application-metrics)
- [Multiple Servers](#multiple-servers)
- [Multiple Servers using Consul](#multiple-servers-using-consul)
- [Local Scale Testing](#local-scale-testing)
- [Understanding the FiloDB Data Model](#understanding-the-filodb-data-model)
- [Prometheus FiloDB Schema for Operational Metrics](#prometheus-filodb-schema-for-operational-metrics)
Expand All @@ -38,6 +38,7 @@ and [filodb-discuss](https://groups.google.com/forum/#!forum/filodb-discuss) goo
- [Sharding](#sharding)
- [Querying FiloDB](#querying-filodb)
- [FiloDB PromQL Extensions](#filodb-promql-extensions)
- [First-Class Histogram Support](#first-class-histogram-support)
- [Using the FiloDB HTTP API](#using-the-filodb-http-api)
- [Grafana setup](#grafana-setup)
- [Using the CLI](#using-the-cli)
Expand Down Expand Up @@ -320,6 +321,8 @@ FiloDB is designed to scale to ingest and query millions of discrete time series

The **partition key** differentiates time series and also controls distribution of time series across the cluster. For more information on sharding, see the sharding section below. Components of a partition key, including individual key/values of `MapColumn`s, are indexed and used for filtering in queries.

The data points use a configurable schema consisting of multiple columns. Each column definition consists of `name:columntype`, with optional parameters. For examples, see the examples below, or see the introductory walk-through above where two datasets are created.

### Prometheus FiloDB Schema for Operational Metrics

* Partition key = `tags:map`
Expand Down Expand Up @@ -383,6 +386,18 @@ Example of debugging chunk metadata using the CLI:

./filo-cli --host 127.0.0.1 --dataset prometheus --promql '_filodb_chunkmeta_all(heap_usage{_ns="App-0"})' --start XX --end YY

### First-Class Histogram Support

One major difference FiloDB has from the Prometheus data model is that FiloDB supports histograms as a first-class entity. In Prometheus, histograms are stored with each bucket in its own time series differentiated by the `le` tag. In FiloDB, there is a `HistogramColumn` which stores all the buckets together for significantly improved compression, especially over the wire during ingestion, as well as significantly faster query speeds (up to two orders of magnitude). There is no "le" tag or individual time series for each bucket. Here are the differences users need to be aware of when using `HistogramColumn`:

* There is no need to append `_bucket` to the metric name.
* However, you need to select the histogram column like `__col__="hist"`
* To compute quantiles: `histogram_quantile(0.7, sum_over_time(http_req_latency{app="foo",__col__="hist"}[5m]))`
* To extract a bucket: `histogram_bucket(100.0, http_req_latency{app="foo",__col__="hist"})`
* Sum over multiple Histogram time series: `sum(sum_over_time(http_req_latency{app="foo",__col__="hist"}[5m]))` - you could then compute quantile over the sum.
- NOTE: Do NOT use `group by (le)` when summing `HistogramColumns`. This is not appropriate as the "le" tag is not used. FiloDB knows how to sum multiple histograms together correctly without grouping tricks.
- FiloDB prevents many incorrect histogram aggregations in Prometheus when using `HistogramColumn`, such as handling of multiple histogram schemas across time series and across time.

### Using the FiloDB HTTP API

Please see the [HTTP API](doc/http_api.md) doc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.initialize()
rmtResp <- indexTable.initialize()
pitResp <- partIndexTable.initialize() } yield rmtResp
pitResp <- partIndexTable.initialize() } yield pitResp
}

def truncate(dataset: DatasetRef): Future[Response] = {
Expand All @@ -79,7 +79,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.clearAll()
rmtResp <- indexTable.clearAll()
pitResp <- partIndexTable.clearAll() } yield rmtResp
pitResp <- partIndexTable.clearAll() } yield pitResp
}

def dropDataset(dataset: DatasetRef): Future[Response] = {
Expand All @@ -94,7 +94,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
chunkTableCache.remove(dataset)
indexTableCache.remove(dataset)
partitionIndexTableCache.remove(dataset)
rmtResp
pitResp
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
memStore.store.sinkStats.chunksetsWritten should be >= 3
memStore.store.sinkStats.chunksetsWritten should be <= 4

memStore.commitIndexForTesting(dataset1.ref)
// Verify data still in MemStore... all of it
val splits = memStore.getScanSplits(dataset1.ref, 1)
val agg1 = memStore.scanRows(dataset1, Seq(1), FilteredPartitionScan(splits.head))
Expand Down
4 changes: 2 additions & 2 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -408,11 +408,11 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
case LongColumn => rowReader.getLong(position).toString
case DoubleColumn => rowReader.getDouble(position).toString
case StringColumn => rowReader.filoUTF8String(position).toString
case BitmapColumn => rowReader.getBoolean(position).toString
case TimestampColumn => rowReader.as[Timestamp](position).toString
case HistogramColumn => rowReader.getHistogram(position).toString
case _ => throw new UnsupportedOperationException("Unsupported type: " + columns(position).columnType)
}
content.update(position,value)
content.update(position, value)
position += 1
}
content
Expand Down
9 changes: 6 additions & 3 deletions conf/logback-dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
</encoder>
</appender>

<logger name="filodb.coordinator" level="TRACE" />
<logger name="filodb.core" level="TRACE" />
<logger name="filodb.coordinator" level="DEBUG" />
<logger name="filodb.core" level="DEBUG" />
<logger name="filodb.memory" level="DEBUG" />
<logger name="filodb.query" level="DEBUG" />
<logger name="com.esotericsoftware.minlog" level="DEBUG" />
<logger name="filodb.coordinator.KamonMetricsLogReporter" level="off" />
<logger name="filodb.coordinator.KamonSpanLogReporter" level="off" />
<logger name="filodb.core.memstore.LuceneMetricsRouter" level="off" />
<!-- <logger name="com.esotericsoftware.minlog" level="DEBUG" /> -->
<!-- <logger name="com.esotericsoftware.kryo.io" level="TRACE" /> -->

<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="INFO"/>
Expand Down
17 changes: 17 additions & 0 deletions conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

max-chunks-size = 400

# Write buffer size, in bytes, for blob columns (histograms, UTF8Strings). Since these are variable data types,
# we need a maximum size, not a maximum number of items.
max-blob-buffer-size = 15000

# Number of bytes of offheap mem to allocate to chunk storage in each shard. Ex. 1000MB, 1G, 2GB
# Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series)
shard-mem-size = 512MB
Expand All @@ -55,6 +59,19 @@

# Amount of time to delay before retrying
# retry-delay = 15s

# Capacity of Bloom filter used to track evicted partitions.
# Tune this based on how much time series churn is expected before a FiloDB node
# will be restarted for upgrade/maintenance. Do not take into account churn created by
# time series that are purged due to retention. When a time series is not ingesting for retention
# period, it is purged, not evicted. Purged PartKeys are not added to Bloom Filter.
#
# To calculate Bloom Filter size:
# console> BloomFilter[String](5000000, falsePositiveRate = 0.01).numberOfBits
# res9: Long = 47925292
# Thats about 6MB
evicted-pk-bloom-filter-capacity = 50000

}
downsample {
# can be disabled by setting this flag to false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import filodb.core.downsample.DownsampleConfig
import filodb.core.metadata.Dataset
import filodb.core.store.{AssignShardConfig, IngestionConfig, MetaStore, StoreConfig, UnassignShardConfig}

//scalastyle:off number.of.types
object NodeClusterActor {

sealed trait ClusterActorEvent
Expand Down Expand Up @@ -79,6 +80,9 @@ object NodeClusterActor {
source.downsampleConfig)
}

// Only used during initial setup, called during recovery and before calling initiateShardStateRecovery.
private final case class SetupDatasetFinished(ref: DatasetRef)

// A dummy source to use for tests and when you just want to push new records in
val noOpSource = IngestionSource(classOf[NoOpStreamFactory].getName)

Expand Down Expand Up @@ -314,10 +318,11 @@ private[filodb] class NodeClusterActor(settings: FilodbSettings,
// The initial recovery handler: recover dataset setup/ingestion config first
def datasetHandler: Receive = LoggingReceive {
case e: SetupDataset =>
setupDataset(e, sender()) map { _ =>
initDatasets -= e.ref
if (initDatasets.isEmpty) initiateShardStateRecovery()
}
setupDataset(e, sender()) map { _ => self ! SetupDatasetFinished(e.ref) }
case e: SetupDatasetFinished => {
initDatasets -= e.ref
if (initDatasets.isEmpty) initiateShardStateRecovery()
}
case GetDatasetFromRef(r) => sender() ! datasets(r)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,23 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
}
}

private def aliveIngesters: Seq[(DatasetRef, ActorRef)] = {
val kids = context.children.toBuffer
ingesters.toSeq.filter { case (dsRef, actorRef) => kids contains actorRef }
}

private def reset(origin: ActorRef): Unit = {
ingesters.values.foreach(_ ! PoisonPill)
queryActors.values.foreach(_ ! PoisonPill)
ingesters.clear()
queryActors.clear()
memStore.reset()

// Wait for all ingestor children to die
while (aliveIngesters.nonEmpty) {
logger.info(s"In reset, waiting for children to die.... ingesters=$ingesters children=${context.children}")
Thread sleep 250
}
origin ! NodeProtocol.StateReset
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
// Above condition ensures that we respond to shard events only from the node shard is currently assigned to.
// Needed to avoid race conditions where IngestionStopped for an old assignment comes after shard is reassigned.
updateFromShardEvent(event)
publishSnapshot(event.ref)
// reassign shard if IngestionError. Exclude previous node since it had error shards.
event match {
case _: IngestionError =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,12 @@ import com.esotericsoftware.kryo.{Serializer => KryoSerializer}
import com.esotericsoftware.kryo.Kryo
import com.typesafe.scalalogging.StrictLogging

import filodb.core.binaryrecord.{BinaryRecord, RecordSchema}
import filodb.core.binaryrecord2.{RecordSchema => RecordSchema2}
import filodb.core.query.{ColumnInfo, PartitionInfo, PartitionRangeVectorKey}
import filodb.memory.format._

// NOTE: This file has to be in the kryo namespace so we can use the require() method

/**
* Serializer for BinaryRecords. One complication with BinaryRecords is that they require a schema.
* We don't want to instantiate a new RecordSchema with every single BR, that would be a huge waste of memory.
* However, it seems that Kryo remembers RecordSchema references, and if the same RecordSchema is used for multiple
* BinaryRecords in an object graph (say a VectorListResult or TupleListResult) then it will be stored by some
* reference ID. Thus it saves us cost and memory allocations on restore. :)
*/
class BinaryRecordSerializer extends KryoSerializer[BinaryRecord] with StrictLogging {
override def read(kryo: Kryo, input: Input, typ: Class[BinaryRecord]): BinaryRecord = {
val schema = kryo.readObject(input, classOf[RecordSchema])
val bytes = input.readBytes(input.readInt)
BinaryRecord(schema, bytes)
}

override def write(kryo: Kryo, output: Output, br: BinaryRecord): Unit = {
kryo.writeObject(output, br.schema)
output.writeInt(br.numBytes)
// It would be simpler if we simply took the bytes from ArrayBinaryRecord and wrote them, but
// BinaryRecords might go offheap.
output.require(br.numBytes)
br.copyTo(output.getBuffer, UnsafeUtils.arayOffset + output.position)
output.setPosition(output.position + br.numBytes)
}
}

object BinaryRegionUtils extends StrictLogging {
def writeLargeRegion(base: Any, offset: Long, output: Output): Unit = {
val numBytes = UnsafeUtils.getInt(base, offset)
Expand Down Expand Up @@ -63,7 +37,7 @@ class PartitionRangeVectorKeySerializer extends KryoSerializer[PartitionRangeVec
val schema = kryo.readObject(input, classOf[RecordSchema2])
val keyCols = kryo.readClassAndObject(input)
PartitionRangeVectorKey(partBytes, UnsafeUtils.arayOffset,
schema, keyCols.asInstanceOf[Seq[ColumnInfo]], input.readInt, input.readInt)
schema, keyCols.asInstanceOf[Seq[ColumnInfo]], input.readInt, input.readInt, input.readInt)
}

override def write(kryo: Kryo, output: Output, key: PartitionRangeVectorKey): Unit = {
Expand All @@ -72,6 +46,7 @@ class PartitionRangeVectorKeySerializer extends KryoSerializer[PartitionRangeVec
kryo.writeClassAndObject(output, key.partKeyCols)
output.writeInt(key.sourceShard)
output.writeInt(key.groupNum)
output.writeInt(key.partId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,43 @@ object QueryCommands {
limit: Int = 100,
submitTime: Long = System.currentTimeMillis()) extends QueryCommand

final case class QueryOptions(spreadFunc: Seq[ColumnFilter] => Int = { x => 1 },

final case class SpreadChange(time: Long = 0L, spread: Int = 1)

/**
* This class provides general query processing parameters
* @param spreadFunc a function that returns chronologically ordered spread changes for the filter
*/
final case class QueryOptions(spreadFunc: Seq[ColumnFilter] => Seq[SpreadChange] = { _ => Seq(SpreadChange()) },
parallelism: Int = 16,
queryTimeoutSecs: Int = 30,
sampleLimit: Int = 1000000,
shardOverrides: Option[Seq[Int]] = None)

object QueryOptions {
def apply(constSpread: Int, sampleLimit: Int): QueryOptions =
QueryOptions(spreadFunc = { x => constSpread}, sampleLimit = sampleLimit)
QueryOptions(spreadFunc = { _ => Seq(SpreadChange(spread = constSpread))}, sampleLimit = sampleLimit)

/**
* Creates a spreadFunc that looks for a particular filter with keyName Equals a value, and then maps values
* present in the spreadMap to specific spread values, with a default if the filter/value not present in the map
*/
def simpleMapSpreadFunc(keyName: String,
spreadMap: collection.Map[String, Int],
defaultSpread: Int): Seq[ColumnFilter] => Int = {
defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] = {
filters: Seq[ColumnFilter] =>
filters.collect {
filters.collectFirst {
case ColumnFilter(key, Filter.Equals(filtVal: String)) if key == keyName => filtVal
}.headOption.map { tagValue =>
spreadMap.getOrElse(tagValue, defaultSpread)
}.getOrElse(defaultSpread)
}.map { tagValue =>
Seq(SpreadChange(spread = spreadMap.getOrElse(tagValue, defaultSpread)))
}.getOrElse(Seq(SpreadChange(defaultSpread)))
}

import collection.JavaConverters._

def simpleMapSpreadFunc(keyName: String,
spreadMap: java.util.Map[String, Int],
defaultSpread: Int): Seq[ColumnFilter] => Int =
defaultSpread: Int): Seq[ColumnFilter] => Seq[SpreadChange] =
simpleMapSpreadFunc(keyName, spreadMap.asScala, defaultSpread)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.esotericsoftware.kryo.io._
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer

import filodb.core._
import filodb.core.binaryrecord.{ArrayBinaryRecord, BinaryRecord, RecordSchema}
import filodb.core.binaryrecord2.{RecordSchema => RecordSchema2}
import filodb.core.metadata.Column
import filodb.core.query.ColumnInfo
Expand Down Expand Up @@ -35,10 +34,7 @@ class KryoInit {
val colTypeSer = new ColumnTypeSerializer
Column.ColumnType.values.zipWithIndex.foreach { case (ct, i) => kryo.register(ct.getClass, colTypeSer, 100 + i) }

kryo.addDefaultSerializer(classOf[RecordSchema], classOf[RecordSchemaSerializer])
kryo.addDefaultSerializer(classOf[RecordSchema2], classOf[RecordSchema2Serializer])
kryo.addDefaultSerializer(classOf[BinaryRecord], classOf[BinaryRecordSerializer])

kryo.addDefaultSerializer(classOf[ZeroCopyUTF8String], classOf[ZeroCopyUTF8StringSerializer])

initOtherFiloClasses(kryo)
Expand Down Expand Up @@ -71,9 +67,6 @@ class KryoInit {
def initOtherFiloClasses(kryo: Kryo): Unit = {
// Initialize other commonly used FiloDB classes
kryo.register(classOf[DatasetRef])
kryo.register(classOf[BinaryRecord])
kryo.register(classOf[ArrayBinaryRecord])
kryo.register(classOf[RecordSchema])
kryo.register(classOf[RecordSchema2])
kryo.register(classOf[filodb.coordinator.ShardEvent])
kryo.register(classOf[filodb.coordinator.CurrentShardSnapshot])
Expand All @@ -82,17 +75,14 @@ class KryoInit {

import filodb.core.query._
kryo.register(classOf[PartitionInfo], new PartitionInfoSerializer)
kryo.register(classOf[Tuple])
kryo.register(classOf[ColumnInfo])
kryo.register(classOf[TupleResult])
kryo.register(classOf[TupleListResult])
kryo.register(classOf[ColumnFilter])

import filodb.core.store._
kryo.register(classOf[ChunkSetInfo])
kryo.register(WriteBufferChunkScan.getClass)
kryo.register(AllChunkScan.getClass)
kryo.register(classOf[RowKeyChunkScan])
kryo.register(classOf[TimeRangeChunkScan])
kryo.register(classOf[FilteredPartitionScan])
kryo.register(classOf[ShardSplit])

Expand All @@ -111,17 +101,6 @@ class ColumnTypeSerializer extends KryoSerializer[Column.ColumnType] {
override def write(kryo: Kryo, output: Output, colType: Column.ColumnType): Unit = {}
}

class RecordSchemaSerializer extends KryoSerializer[RecordSchema] {
override def read(kryo: Kryo, input: Input, typ: Class[RecordSchema]): RecordSchema = {
val colTypesObj = kryo.readClassAndObject(input)
new RecordSchema(colTypesObj.asInstanceOf[Seq[Column.ColumnType]])
}

override def write(kryo: Kryo, output: Output, schema: RecordSchema): Unit = {
kryo.writeClassAndObject(output, schema.columnTypes)
}
}

class RecordSchema2Serializer extends KryoSerializer[RecordSchema2] {
override def read(kryo: Kryo, input: Input, typ: Class[RecordSchema2]): RecordSchema2 = {
val tuple = kryo.readClassAndObject(input)
Expand Down
Loading

0 comments on commit f9b2c77

Please sign in to comment.