Skip to content

Commit

Permalink
Merge branch '0.8.4-integration' to master 0.8.4 release
Browse files Browse the repository at this point in the history
  • Loading branch information
sherali42 committed Apr 15, 2019
2 parents 847e0a6 + b1bb604 commit 60cab08
Show file tree
Hide file tree
Showing 155 changed files with 5,660 additions and 2,533 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ env:
scala:
- 2.11.12
jdk:
- oraclejdk8
- openjdk11

# These directories are cached to S3 at the end of the build
cache:
Expand Down
29 changes: 23 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ and [filodb-discuss](https://groups.google.com/forum/#!forum/filodb-discuss) goo
- [Getting Started](#getting-started)
- [End to End Kafka Developer Setup](#end-to-end-kafka-developer-setup)
- [Using the Gateway to stream Application Metrics](#using-the-gateway-to-stream-application-metrics)
- [Multiple Servers](#multiple-servers)
- [Multiple Servers using Consul](#multiple-servers-using-consul)
- [Local Scale Testing](#local-scale-testing)
- [Understanding the FiloDB Data Model](#understanding-the-filodb-data-model)
- [Prometheus FiloDB Schema for Operational Metrics](#prometheus-filodb-schema-for-operational-metrics)
Expand All @@ -38,6 +38,7 @@ and [filodb-discuss](https://groups.google.com/forum/#!forum/filodb-discuss) goo
- [Sharding](#sharding)
- [Querying FiloDB](#querying-filodb)
- [FiloDB PromQL Extensions](#filodb-promql-extensions)
- [First-Class Histogram Support](#first-class-histogram-support)
- [Using the FiloDB HTTP API](#using-the-filodb-http-api)
- [Grafana setup](#grafana-setup)
- [Using the CLI](#using-the-cli)
Expand Down Expand Up @@ -201,10 +202,10 @@ You can also check the server logs at `logs/filodb-server-N.log`.
Now run the time series generator. This will ingest 20 time series (the default) with 100 samples each into the Kafka topic with current timestamps. The required argument is the path to the source config. Use `--help` for all the options.
```
java -cp gateway/target/scala-2.11/gateway-*-SNAPSHOT filodb.timeseries.TestTimeseriesProducer -c conf/timeseries-dev-source.conf
./dev-gateway.sh --gen-prom-data conf/timeseries-dev-source.conf
```
NOTE: The `TestTimeseriesProducer` logs to logs/gateway-server.log.
NOTE: Check logs/gateway-server.log for logs.
At this point, you should be able to confirm such a message in the server logs: `KAMON counter name=memstore-rows-ingested count=4999`
Expand All @@ -219,8 +220,9 @@ You can also look at Cassandra to check for persisted data. Look at the tables i
If the above does not work, try the following:
1) Delete the Kafka topic and re-create it. Note that Kafka topic deletion might not happen until the server is stopped and restarted
1a) Restart Kafka, this is sometimes necessary.
2) `./filodb-dev-stop.sh` and restart filodb instances like above
3) Re-run the `TestTimeseriesProducer`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -cp standalone/target/scala-2.11/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero.
3) Re-run `./dev-gateway.sh --gen-prom-data`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -Dconfig.file=conf/timeseries-filodb-server.conf -cp standalone/target/scala-2.11/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero.
To stop the dev server. Note that this will stop all the FiloDB servers if multiple are running.
```
Expand Down Expand Up @@ -307,7 +309,7 @@ Now if you curl the cluster status you should see 128 shards which are slowly tu
Generate records:

```
java -cp gateway/target/scala-2.11/gateway-*.telemetry-SNAPSHOT filodb.timeseries.TestTimeseriesProducer -c conf/timeseries-128shards-source.conf -p 5000
./dev-gateway.sh --gen-prom-data -p 5000 conf/timeseries-128shards-source.conf
```

## Understanding the FiloDB Data Model
Expand All @@ -320,6 +322,8 @@ FiloDB is designed to scale to ingest and query millions of discrete time series

The **partition key** differentiates time series and also controls distribution of time series across the cluster. For more information on sharding, see the sharding section below. Components of a partition key, including individual key/values of `MapColumn`s, are indexed and used for filtering in queries.

The data points use a configurable schema consisting of multiple columns. Each column definition consists of `name:columntype`, with optional parameters. For examples, see the examples below, or see the introductory walk-through above where two datasets are created.

### Prometheus FiloDB Schema for Operational Metrics

* Partition key = `tags:map`
Expand Down Expand Up @@ -383,6 +387,18 @@ Example of debugging chunk metadata using the CLI:

./filo-cli --host 127.0.0.1 --dataset prometheus --promql '_filodb_chunkmeta_all(heap_usage{_ns="App-0"})' --start XX --end YY

### First-Class Histogram Support

One major difference FiloDB has from the Prometheus data model is that FiloDB supports histograms as a first-class entity. In Prometheus, histograms are stored with each bucket in its own time series differentiated by the `le` tag. In FiloDB, there is a `HistogramColumn` which stores all the buckets together for significantly improved compression, especially over the wire during ingestion, as well as significantly faster query speeds (up to two orders of magnitude). There is no "le" tag or individual time series for each bucket. Here are the differences users need to be aware of when using `HistogramColumn`:

* There is no need to append `_bucket` to the metric name.
* However, you need to select the histogram column like `__col__="hist"`
* To compute quantiles: `histogram_quantile(0.7, sum_over_time(http_req_latency{app="foo",__col__="hist"}[5m]))`
* To extract a bucket: `histogram_bucket(100.0, http_req_latency{app="foo",__col__="hist"})`
* Sum over multiple Histogram time series: `sum(sum_over_time(http_req_latency{app="foo",__col__="hist"}[5m]))` - you could then compute quantile over the sum.
- NOTE: Do NOT use `group by (le)` when summing `HistogramColumns`. This is not appropriate as the "le" tag is not used. FiloDB knows how to sum multiple histograms together correctly without grouping tricks.
- FiloDB prevents many incorrect histogram aggregations in Prometheus when using `HistogramColumn`, such as handling of multiple histogram schemas across time series and across time.

### Using the FiloDB HTTP API

Please see the [HTTP API](doc/http_api.md) doc.
Expand Down Expand Up @@ -574,7 +590,8 @@ The `filo-cli` accepts arguments and options as key-value pairs, specified like
| minutes | A shortcut to set the start at N minutes ago, and the stop at current time. Should specify a step also. |
| chunks | Either "memory" or "buffers" to select either all the in-memory chunks or the write buffers only. Should specify a step also. |
| database | Specifies the "database" the dataset should operate in. For Cassandra, this is the keyspace. If not specified, uses config value. |
| limit | The maximum number of samples per time series |
| limit | Limits the number of time series in the output |
| sampleLimit | Maximum number of output samples in the query result. An exception is thrown if the output returns more results than this. |
| shards | (EXPERT) overrides the automatic shard calculation by passing in a comma-separated list of specific shards to query. Very useful to debug sharding issues. |
| everyNSeconds | Repeats the query every (argument) seconds |
| timeoutSeconds | The number of seconds for the network timeout |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.initialize()
rmtResp <- indexTable.initialize()
pitResp <- partIndexTable.initialize() } yield rmtResp
pitResp <- partIndexTable.initialize() } yield pitResp
}

def truncate(dataset: DatasetRef): Future[Response] = {
Expand All @@ -79,7 +79,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
clusterMeta.checkSchemaAgreement()
for { ctResp <- chunkTable.clearAll()
rmtResp <- indexTable.clearAll()
pitResp <- partIndexTable.clearAll() } yield rmtResp
pitResp <- partIndexTable.clearAll() } yield pitResp
}

def dropDataset(dataset: DatasetRef): Future[Response] = {
Expand All @@ -94,7 +94,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
chunkTableCache.remove(dataset)
indexTableCache.remove(dataset)
partitionIndexTableCache.remove(dataset)
rmtResp
pitResp
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class MemstoreCassandraSinkSpec extends AllTablesTest {
memStore.store.sinkStats.chunksetsWritten should be >= 3
memStore.store.sinkStats.chunksetsWritten should be <= 4

memStore.commitIndexForTesting(dataset1.ref)
// Verify data still in MemStore... all of it
val splits = memStore.getScanSplits(dataset1.ref, 1)
val agg1 = memStore.scanRows(dataset1, Seq(1), FilteredPartitionScan(splits.head))
Expand Down
16 changes: 8 additions & 8 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class Arguments extends FieldArgs {
var rowKeys: Seq[String] = Seq("timestamp")
var partitionKeys: Seq[String] = Nil
var select: Option[Seq[String]] = None
// max # query items (vectors or tuples) returned. Don't make it too high.
var limit: Int = 1000
var sampleLimit: Int = 200
// max # of RangeVectors returned. Don't make it too high.
var limit: Int = 200
var sampleLimit: Int = 1000000
var timeoutSeconds: Int = 60
var outfile: Option[String] = None
var delimiter: String = ","
Expand Down Expand Up @@ -364,7 +364,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste

def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions): Unit = {
val ref = DatasetRef(dataset)
val qOpts = QueryCommands.QueryOptions(options.spread, options.limit)
val qOpts = QueryCommands.QueryOptions(options.spread, options.sampleLimit)
.copy(queryTimeoutSecs = options.timeout.toSeconds.toInt,
shardOverrides = options.shardOverrides)
println(s"Sending query command to server for $ref with options $qOpts...")
Expand All @@ -373,7 +373,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
case Some(intervalSecs) =>
val fut = Observable.intervalAtFixedRate(intervalSecs.seconds).foreach { n =>
client.logicalPlan2Query(ref, plan, qOpts) match {
case QueryResult(_, schema, result) => result.foreach(rv => println(rv.prettyPrint()))
case QueryResult(_, schema, result) => result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
case err: QueryError => throw new ClientException(err)
}
}.recover {
Expand All @@ -385,7 +385,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
try {
client.logicalPlan2Query(ref, plan, qOpts) match {
case QueryResult(_, schema, result) => println(s"Number of Range Vectors: ${result.size}")
result.foreach(rv => println(rv.prettyPrint()))
result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
case QueryError(_,ex) => println(s"QueryError: ${ex.getClass.getSimpleName} ${ex.getMessage}")
}
} catch {
Expand All @@ -408,11 +408,11 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
case LongColumn => rowReader.getLong(position).toString
case DoubleColumn => rowReader.getDouble(position).toString
case StringColumn => rowReader.filoUTF8String(position).toString
case BitmapColumn => rowReader.getBoolean(position).toString
case TimestampColumn => rowReader.as[Timestamp](position).toString
case HistogramColumn => rowReader.getHistogram(position).toString
case _ => throw new UnsupportedOperationException("Unsupported type: " + columns(position).columnType)
}
content.update(position,value)
content.update(position, value)
position += 1
}
content
Expand Down
57 changes: 57 additions & 0 deletions conf/histogram-dev-source.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
dataset = "histogram"
num-shards = 4
min-num-nodes = 2
sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory"

sourceconfig {
# Required FiloDB configurations
filo-topic-name = "histogram-dev"

# Standard kafka configurations, e.g.
# This accepts both the standard kafka value of a comma-separated
# string and a Typesafe list of String values
# EXCEPT: do not populate value.deserializer, as the Kafka format is fixed in FiloDB to be messages of RecordContainer's
bootstrap.servers = "localhost:9092"
group.id = "filo-db-histogram-ingestion"

# Values controlling in-memory store chunking, flushing, etc.
store {
# Interval it takes to flush ALL time series in a shard. This time is further divided by groups-per-shard
flush-interval = 1h

# TTL for on-disk / C* data. Data older than this may be purged.
disk-time-to-live = 24 hours

# amount of time paged chunks should be retained in memory.
# We need to have a minimum of x hours free blocks or else init won't work.
demand-paged-chunk-retention-period = 12 hours

max-chunks-size = 400

# Write buffer size, in bytes, for blob columns (histograms, UTF8Strings). Since these are variable data types,
# we need a maximum size, not a maximum number of items.
max-blob-buffer-size = 15000

# Number of bytes of offheap mem to allocate to chunk storage in each shard. Ex. 1000MB, 1G, 2GB
# Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series)
shard-mem-size = 512MB

# Number of bytes of offheap mem to allocate to write buffers in each shard. Ex. 1000MB, 1G, 2GB
# Scales with the number of time series a shard should hold
ingestion-buffer-mem-size = 50MB

# Number of time series to evict at a time.
# num-partitions-to-evict = 1000

# Number of subgroups within each shard. Persistence to a ChunkSink occurs one subgroup at a time, as does
# recovery from failure. This many batches of flushes must occur to cover persistence of every partition
groups-per-shard = 20

# Use a "MultiPartitionScan" or Cassandra MULTIGET for on-demand paging. Might improve performance.
multi-partition-odp = false
}
downsample {
# can be disabled by setting this flag to false
enabled = false
}
}
9 changes: 6 additions & 3 deletions conf/logback-dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
</encoder>
</appender>

<logger name="filodb.coordinator" level="TRACE" />
<logger name="filodb.core" level="TRACE" />
<logger name="filodb.coordinator" level="DEBUG" />
<logger name="filodb.core" level="DEBUG" />
<logger name="filodb.memory" level="DEBUG" />
<logger name="filodb.query" level="DEBUG" />
<logger name="com.esotericsoftware.minlog" level="DEBUG" />
<logger name="filodb.coordinator.KamonMetricsLogReporter" level="off" />
<logger name="filodb.coordinator.KamonSpanLogReporter" level="off" />
<logger name="filodb.core.memstore.LuceneMetricsRouter" level="off" />
<!-- <logger name="com.esotericsoftware.minlog" level="DEBUG" /> -->
<!-- <logger name="com.esotericsoftware.kryo.io" level="TRACE" /> -->

<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="INFO"/>
Expand Down
17 changes: 17 additions & 0 deletions conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

max-chunks-size = 400

# Write buffer size, in bytes, for blob columns (histograms, UTF8Strings). Since these are variable data types,
# we need a maximum size, not a maximum number of items.
max-blob-buffer-size = 15000

# Number of bytes of offheap mem to allocate to chunk storage in each shard. Ex. 1000MB, 1G, 2GB
# Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series)
shard-mem-size = 512MB
Expand All @@ -55,6 +59,19 @@

# Amount of time to delay before retrying
# retry-delay = 15s

# Capacity of Bloom filter used to track evicted partitions.
# Tune this based on how much time series churn is expected before a FiloDB node
# will be restarted for upgrade/maintenance. Do not take into account churn created by
# time series that are purged due to retention. When a time series is not ingesting for retention
# period, it is purged, not evicted. Purged PartKeys are not added to Bloom Filter.
#
# To calculate Bloom Filter size:
# console> BloomFilter[String](5000000, falsePositiveRate = 0.01).numberOfBits
# res9: Long = 47925292
# Thats about 6MB
evicted-pk-bloom-filter-capacity = 50000

}
downsample {
# can be disabled by setting this flag to false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,

final val streamSubscriptions = new HashMap[Int, CancelableFuture[Unit]]
final val streams = new HashMap[Int, IngestionStream]
final val nodeCoord = context.parent

// Params for creating the default memStore flush scheduler
private final val numGroups = storeConfig.groupsPerShard
Expand Down Expand Up @@ -164,7 +165,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,
create(shard, offset) map { ingestionStream =>
val stream = ingestionStream.get
logger.info(s"Starting normal/active ingestion for dataset=${dataset.ref} shard=$shard at offset $offset")
statusActor ! IngestionStarted(dataset.ref, shard, context.parent)
statusActor ! IngestionStarted(dataset.ref, shard, nodeCoord)

streamSubscriptions(shard) = memStore.ingestStream(dataset.ref,
shard,
Expand Down Expand Up @@ -209,7 +210,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,
.withTag("shard", shard.toString)
.withTag("dataset", dataset.ref.toString).start()
val stream = ingestionStream.get
statusActor ! RecoveryInProgress(dataset.ref, shard, context.parent, 0)
statusActor ! RecoveryInProgress(dataset.ref, shard, nodeCoord, 0)

val shardInstance = memStore.asInstanceOf[TimeSeriesMemStore].getShardE(dataset.ref, shard)
val fut = memStore.recoverStream(dataset.ref, shard, stream, checkpoints, interval)
Expand All @@ -218,7 +219,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,
else (off - startOffset) * 100 / (endOffset - startOffset)
logger.info(s"Recovery of dataset=${dataset.ref} shard=$shard at " +
s"$progressPct % - offset $off (target $endOffset)")
statusActor ! RecoveryInProgress(dataset.ref, shard, context.parent, progressPct.toInt)
statusActor ! RecoveryInProgress(dataset.ref, shard, nodeCoord, progressPct.toInt)
off }
.until(_ >= endOffset)
// TODO: move this code to TimeSeriesShard itself. Shard should control the thread
Expand Down
Loading

0 comments on commit 60cab08

Please sign in to comment.