Skip to content

Commit

Permalink
Merge branch '0.8.5-integration' to master
Browse files Browse the repository at this point in the history
  • Loading branch information
Tanvi Bhavsar authored and Tanvi Bhavsar committed Jun 6, 2019
2 parents 2377ecc + 75b5c44 commit 8a106b2
Show file tree
Hide file tree
Showing 117 changed files with 3,160 additions and 1,145 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ and [filodb-discuss](https://groups.google.com/forum/#!forum/filodb-discuss) goo
- [Code Walkthrough](#code-walkthrough)
- [Building and Testing](#building-and-testing)
- [Debugging serialization and queries](#debugging-serialization-and-queries)
- [Other debugging tips](#other-debugging-tips)
- [Benchmarking](#benchmarking)
- [You can help!](#you-can-help)

Expand Down Expand Up @@ -708,6 +709,13 @@ To dynamically change the log level, you can use the `/admin/loglevel` HTTP API

curl -d 'trace' http://localhost:8080/admin/loglevel/com.esotericsoftware.minlog

### Other debugging tips

To debug raw record ingestion and data mismatches:

* Set the `trace-filters` source config setting... see `timeseries-dev-source.conf` and `TimeSeriesShard` `tracedPartFilters`. This will log every sample for time series matching criteria set in trace-filters.
* Use the filtering capability in `filodb.kafka.TestConsumer` to print out raw records received from Kafka.

### Benchmarking

To run benchmarks, from within SBT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import com.datastax.driver.core.ConsistencyLevel
import com.datastax.driver.core.{ConsistencyLevel, SimpleStatement}
import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
Expand Down Expand Up @@ -40,17 +40,18 @@ sealed class PartitionIndexTable(val dataset: DatasetRef,
|) WITH compression = {
'sstable_compression': '$sstableCompression'}""".stripMargin

lazy val readCql = session.prepare(s"SELECT segmentid, segment " +
s"FROM $tableString WHERE shard = ? AND timebucket = ? order by segmentid asc")
lazy val readCql = s"SELECT segmentid, segment " +
s"FROM $tableString WHERE shard = ? AND timebucket = ? order by segmentid asc"

lazy val writePartitionCql = session.prepare(
s"INSERT INTO $tableString (shard, timebucket, segmentid, segment) VALUES (?, ?, ?, ?) USING TTL ?")
.setConsistencyLevel(writeConsistencyLevel)

def getPartKeySegments(shard: Int, timeBucket: Int): Observable[PartKeyTimeBucketSegment] = {
val it = session.execute(readCql.bind(shard: JInt, timeBucket: JInt))
// fetch size should be low since each row is about an MB. Default fetchSize can result in ReadTimeouts at server
val it = session.execute(new SimpleStatement(readCql, shard: JInt, timeBucket: JInt).setFetchSize(15))
.asScala.toIterator.map(row => {
PartKeyTimeBucketSegment(row.getInt("segmentid"), row.getBytes("segment"))
PartKeyTimeBucketSegment(row.getInt("segmentid"), row.getBytes("segment"))
})
Observable.fromIterator(it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class CassandraMetaStore(config: Config, filoSessionProvider: Option[FiloSession
checkpointTable.writeCheckpoint(dataset, shardNum, groupNum, offset)
}

def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int,Long]] = {
def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int, Long]] = {
checkpointTable.readCheckpoints(dataset, shardNum)
}

def readEarliestCheckpoint(dataset: DatasetRef, shardNum: Int): Future[Long] = {
readCheckpoints(dataset,shardNum) map { m =>
readCheckpoints(dataset, shardNum) map { m =>
if (m.values.isEmpty) Long.MinValue else m.values.min
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ sealed class CheckpointTable(val config: Config,
dataset.dataset, shardNum: JInt, groupNum: JInt, offset: JLong))
}

def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int,Long]] = {
def readCheckpoints(dataset: DatasetRef, shardNum: Int): Future[Map[Int, Long]] = {
session.executeAsync(readCheckpointCql.bind(dataset.database.getOrElse(""),
dataset.dataset, shardNum: JInt))
.toIterator // future of Iterator
Expand Down
18 changes: 10 additions & 8 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import monix.reactive.Observable

import filodb.coordinator._
import filodb.coordinator.client._
import filodb.coordinator.client.QueryCommands.{SpreadChange, SpreadProvider, StaticSpreadProvider}
import filodb.core._
import filodb.core.metadata.{Column, Dataset, DatasetOptions}
import filodb.core.store._
Expand Down Expand Up @@ -60,6 +61,7 @@ class Arguments extends FieldArgs {
var ignoreTagsOnPartitionKeyHash: Seq[String] = Nil
var everyNSeconds: Option[String] = None
var shards: Option[Seq[String]] = None
var spread: Option[Integer] = None
}

object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbClusterNode {
Expand Down Expand Up @@ -116,7 +118,6 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
}

def main(args: Arguments): Unit = {
val spread = config.getInt("default-spread")
try {
val timeout = args.timeoutSeconds.seconds
args.command match {
Expand Down Expand Up @@ -198,15 +199,15 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
require(args.host.nonEmpty && args.dataset.nonEmpty && args.matcher.nonEmpty, "--host, --dataset and --matcher must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), spread)
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseTimeSeriesMetadataQuery(remote, args.matcher.get, args.dataset.get,
getQueryRange(args), options)

case Some("labelValues") =>
require(args.host.nonEmpty && args.dataset.nonEmpty && args.labelNames.nonEmpty, "--host, --dataset and --labelName must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), spread)
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parseLabelValuesQuery(remote, args.labelNames, args.labelFilter, args.dataset.get,
getQueryRange(args), options)

Expand All @@ -216,7 +217,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
val options = QOptions(args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt),
timeout, args.shards.map(_.map(_.toInt)), spread)
timeout, args.shards.map(_.map(_.toInt)), args.spread)
parsePromQuery2(remote, query, args.dataset.get, getQueryRange(args), options)
}.orElse {
args.select.map { selectCols =>
Expand Down Expand Up @@ -267,13 +268,13 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
ignoreTagsOnPartitionKeyHash: Seq[String],
timeout: FiniteDuration): Unit = {
try {
val datasetObj = Dataset(dataset.dataset, partitionColumns, dataColumns, rowKeys, downsamplers)
val options = DatasetOptions.DefaultOptions.copy(metricColumn = metricColumn,
shardKeyColumns = shardKeyColumns,
ignoreShardKeyColumnSuffixes = ignoreShardKeyColumnSuffixes,
ignoreTagsOnPartitionKeyHash = ignoreTagsOnPartitionKeyHash)
val datasetObj = Dataset(dataset.dataset, partitionColumns, dataColumns, rowKeys, downsamplers, options)
println(s"Creating dataset $dataset with options $options...")
client.createNewDataset(datasetObj.copy(options = options), dataset.database)
client.createNewDataset(datasetObj, dataset.database)
exitCode = 0
} catch {
case b: Dataset.BadSchemaError =>
Expand Down Expand Up @@ -339,7 +340,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
final case class QOptions(limit: Int, sampleLimit: Int,
everyN: Option[Int], timeout: FiniteDuration,
shardOverrides: Option[Seq[Int]],
spread: Int)
spread: Option[Integer])

def parseTimeSeriesMetadataQuery(client: LocalClient, query: String, dataset: String,
timeParams: TimeRangeParams,
Expand All @@ -364,7 +365,8 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste

def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions): Unit = {
val ref = DatasetRef(dataset)
val qOpts = QueryCommands.QueryOptions(options.spread, options.sampleLimit)
val spreadProvider: Option[SpreadProvider] = options.spread.map(s => StaticSpreadProvider(SpreadChange(0, s)))
val qOpts = QueryCommands.QueryOptions(spreadProvider, options.sampleLimit)
.copy(queryTimeoutSecs = options.timeout.toSeconds.toInt,
shardOverrides = options.shardOverrides)
println(s"Sending query command to server for $ref with options $qOpts...")
Expand Down
5 changes: 2 additions & 3 deletions conf/histogram-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
# Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series)
shard-mem-size = 512MB

# Number of bytes of offheap mem to allocate to write buffers in each shard. Ex. 1000MB, 1G, 2GB
# Scales with the number of time series a shard should hold
ingestion-buffer-mem-size = 50MB
# Number of bytes of offheap mem to allocate to write buffers for all shards. Ex. 1000MB, 1G, 2GB
ingestion-buffer-mem-size = 200MB

# Number of time series to evict at a time.
# num-partitions-to-evict = 1000
Expand Down
3 changes: 1 addition & 2 deletions conf/logback-dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,5 @@

<root level="INFO">
<appender-ref ref="STDOUT"/>
<!-- Direct logs to Console in PIE. They get moved to Splunk -->
</root>
</configuration>
</configuration>
14 changes: 11 additions & 3 deletions conf/timeseries-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
# Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series)
shard-mem-size = 512MB

# Number of bytes of offheap mem to allocate to write buffers in each shard. Ex. 1000MB, 1G, 2GB
# Scales with the number of time series a shard should hold
ingestion-buffer-mem-size = 50MB
# Number of bytes of offheap mem to allocate to write buffers for all shards. Ex. 1000MB, 1G, 2GB
ingestion-buffer-mem-size = 200MB

# Maximum numer of write buffers to retain in each shard's WriteBufferPool. Any extra buffers are released
# back to native memory, which helps memory reuse.
# max-buffer-pool-size = 10000

# Number of time series to evict at a time.
# num-partitions-to-evict = 1000
Expand Down Expand Up @@ -72,6 +75,11 @@
# Thats about 6MB
evicted-pk-bloom-filter-capacity = 50000

# Uncomment to log at DEBUG ingested samples matching these filters on Partition Key columns
# Only works for StringColumn fields in the partition key. Scope may be expanded later.
# trace-filters = {
# metric = "bad-metric-to-log"
# }
}
downsample {
# can be disabled by setting this flag to false
Expand Down
5 changes: 2 additions & 3 deletions conf/timeseries-ds-1m-dev-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@
# Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series)
shard-mem-size = 512MB

# Number of bytes of offheap mem to allocate to write buffers in each shard. Ex. 1000MB, 1G, 2GB
# Scales with the number of time series a shard should hold
ingestion-buffer-mem-size = 50MB
# Number of bytes of offheap mem to allocate to write buffers for all shards. Ex. 1000MB, 1G, 2GB
ingestion-buffer-mem-size = 200MB

# Number of time series to evict at a time.
# num-partitions-to-evict = 1000
Expand Down
13 changes: 13 additions & 0 deletions conf/timeseries-filodb-server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ filodb {
port = 9042
partition-list-num-groups = 1
}
spread-default = 1

# Override default spread for application using override block which will have non metric shard keys and spread.
spread-assignment = [
{
_ns = App-0,
_spread_ = 2
},
{
_ns = App-5,
_spread_ = 0
}
]
}

kamon {
Expand Down
2 changes: 1 addition & 1 deletion conf/timeseries-standalonetest-source.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
max-chunks-size = 400

shard-mem-size = 512MB
ingestion-buffer-mem-size = 50MB
ingestion-buffer-mem-size = 200MB
groups-per-shard = 20
multi-partition-odp = false
}
Expand Down
Loading

0 comments on commit 8a106b2

Please sign in to comment.