Skip to content

Commit

Permalink
Merge Develop to 0.8.4-integration - bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tjackpaul authored Apr 5, 2019
2 parents f9b2c77 + 4183941 commit b1bb604
Show file tree
Hide file tree
Showing 32 changed files with 483 additions and 252 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ You can also check the server logs at `logs/filodb-server-N.log`.
Now run the time series generator. This will ingest 20 time series (the default) with 100 samples each into the Kafka topic with current timestamps. The required argument is the path to the source config. Use `--help` for all the options.
```
java -cp gateway/target/scala-2.11/gateway-*-SNAPSHOT filodb.timeseries.TestTimeseriesProducer -c conf/timeseries-dev-source.conf
./dev-gateway.sh --gen-prom-data conf/timeseries-dev-source.conf
```
NOTE: The `TestTimeseriesProducer` logs to logs/gateway-server.log.
NOTE: Check logs/gateway-server.log for logs.
At this point, you should be able to confirm such a message in the server logs: `KAMON counter name=memstore-rows-ingested count=4999`
Expand All @@ -220,8 +220,9 @@ You can also look at Cassandra to check for persisted data. Look at the tables i
If the above does not work, try the following:
1) Delete the Kafka topic and re-create it. Note that Kafka topic deletion might not happen until the server is stopped and restarted
1a) Restart Kafka, this is sometimes necessary.
2) `./filodb-dev-stop.sh` and restart filodb instances like above
3) Re-run the `TestTimeseriesProducer`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -cp standalone/target/scala-2.11/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero.
3) Re-run `./dev-gateway.sh --gen-prom-data`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -Dconfig.file=conf/timeseries-filodb-server.conf -cp standalone/target/scala-2.11/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero.
To stop the dev server. Note that this will stop all the FiloDB servers if multiple are running.
```
Expand Down Expand Up @@ -308,7 +309,7 @@ Now if you curl the cluster status you should see 128 shards which are slowly tu
Generate records:

```
java -cp gateway/target/scala-2.11/gateway-*.telemetry-SNAPSHOT filodb.timeseries.TestTimeseriesProducer -c conf/timeseries-128shards-source.conf -p 5000
./dev-gateway.sh --gen-prom-data -p 5000 conf/timeseries-128shards-source.conf
```

## Understanding the FiloDB Data Model
Expand Down Expand Up @@ -589,7 +590,8 @@ The `filo-cli` accepts arguments and options as key-value pairs, specified like
| minutes | A shortcut to set the start at N minutes ago, and the stop at current time. Should specify a step also. |
| chunks | Either "memory" or "buffers" to select either all the in-memory chunks or the write buffers only. Should specify a step also. |
| database | Specifies the "database" the dataset should operate in. For Cassandra, this is the keyspace. If not specified, uses config value. |
| limit | The maximum number of samples per time series |
| limit | Limits the number of time series in the output |
| sampleLimit | Maximum number of output samples in the query result. An exception is thrown if the output returns more results than this. |
| shards | (EXPERT) overrides the automatic shard calculation by passing in a comma-separated list of specific shards to query. Very useful to debug sharding issues. |
| everyNSeconds | Repeats the query every (argument) seconds |
| timeoutSeconds | The number of seconds for the network timeout |
Expand Down
12 changes: 6 additions & 6 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class Arguments extends FieldArgs {
var rowKeys: Seq[String] = Seq("timestamp")
var partitionKeys: Seq[String] = Nil
var select: Option[Seq[String]] = None
// max # query items (vectors or tuples) returned. Don't make it too high.
var limit: Int = 1000
var sampleLimit: Int = 200
// max # of RangeVectors returned. Don't make it too high.
var limit: Int = 200
var sampleLimit: Int = 1000000
var timeoutSeconds: Int = 60
var outfile: Option[String] = None
var delimiter: String = ","
Expand Down Expand Up @@ -364,7 +364,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste

def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions): Unit = {
val ref = DatasetRef(dataset)
val qOpts = QueryCommands.QueryOptions(options.spread, options.limit)
val qOpts = QueryCommands.QueryOptions(options.spread, options.sampleLimit)
.copy(queryTimeoutSecs = options.timeout.toSeconds.toInt,
shardOverrides = options.shardOverrides)
println(s"Sending query command to server for $ref with options $qOpts...")
Expand All @@ -373,7 +373,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
case Some(intervalSecs) =>
val fut = Observable.intervalAtFixedRate(intervalSecs.seconds).foreach { n =>
client.logicalPlan2Query(ref, plan, qOpts) match {
case QueryResult(_, schema, result) => result.foreach(rv => println(rv.prettyPrint()))
case QueryResult(_, schema, result) => result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
case err: QueryError => throw new ClientException(err)
}
}.recover {
Expand All @@ -385,7 +385,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
try {
client.logicalPlan2Query(ref, plan, qOpts) match {
case QueryResult(_, schema, result) => println(s"Number of Range Vectors: ${result.size}")
result.foreach(rv => println(rv.prettyPrint()))
result.take(options.limit).foreach(rv => println(rv.prettyPrint()))
case QueryError(_,ex) => println(s"QueryError: ${ex.getClass.getSimpleName} ${ex.getMessage}")
}
} catch {
Expand Down
57 changes: 57 additions & 0 deletions conf/histogram-dev-source.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
dataset = "histogram"
num-shards = 4
min-num-nodes = 2
sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory"

sourceconfig {
# Required FiloDB configurations
filo-topic-name = "histogram-dev"

# Standard kafka configurations, e.g.
# This accepts both the standard kafka value of a comma-separated
# string and a Typesafe list of String values
# EXCEPT: do not populate value.deserializer, as the Kafka format is fixed in FiloDB to be messages of RecordContainer's
bootstrap.servers = "localhost:9092"
group.id = "filo-db-histogram-ingestion"

# Values controlling in-memory store chunking, flushing, etc.
store {
# Interval it takes to flush ALL time series in a shard. This time is further divided by groups-per-shard
flush-interval = 1h

# TTL for on-disk / C* data. Data older than this may be purged.
disk-time-to-live = 24 hours

# amount of time paged chunks should be retained in memory.
# We need to have a minimum of x hours free blocks or else init won't work.
demand-paged-chunk-retention-period = 12 hours

max-chunks-size = 400

# Write buffer size, in bytes, for blob columns (histograms, UTF8Strings). Since these are variable data types,
# we need a maximum size, not a maximum number of items.
max-blob-buffer-size = 15000

# Number of bytes of offheap mem to allocate to chunk storage in each shard. Ex. 1000MB, 1G, 2GB
# Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series)
shard-mem-size = 512MB

# Number of bytes of offheap mem to allocate to write buffers in each shard. Ex. 1000MB, 1G, 2GB
# Scales with the number of time series a shard should hold
ingestion-buffer-mem-size = 50MB

# Number of time series to evict at a time.
# num-partitions-to-evict = 1000

# Number of subgroups within each shard. Persistence to a ChunkSink occurs one subgroup at a time, as does
# recovery from failure. This many batches of flushes must occur to cover persistence of every partition
groups-per-shard = 20

# Use a "MultiPartitionScan" or Cassandra MULTIGET for on-demand paging. Might improve performance.
multi-partition-odp = false
}
downsample {
# can be disabled by setting this flag to false
enabled = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,

final val streamSubscriptions = new HashMap[Int, CancelableFuture[Unit]]
final val streams = new HashMap[Int, IngestionStream]
final val nodeCoord = context.parent

// Params for creating the default memStore flush scheduler
private final val numGroups = storeConfig.groupsPerShard
Expand Down Expand Up @@ -164,7 +165,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,
create(shard, offset) map { ingestionStream =>
val stream = ingestionStream.get
logger.info(s"Starting normal/active ingestion for dataset=${dataset.ref} shard=$shard at offset $offset")
statusActor ! IngestionStarted(dataset.ref, shard, context.parent)
statusActor ! IngestionStarted(dataset.ref, shard, nodeCoord)

streamSubscriptions(shard) = memStore.ingestStream(dataset.ref,
shard,
Expand Down Expand Up @@ -209,7 +210,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,
.withTag("shard", shard.toString)
.withTag("dataset", dataset.ref.toString).start()
val stream = ingestionStream.get
statusActor ! RecoveryInProgress(dataset.ref, shard, context.parent, 0)
statusActor ! RecoveryInProgress(dataset.ref, shard, nodeCoord, 0)

val shardInstance = memStore.asInstanceOf[TimeSeriesMemStore].getShardE(dataset.ref, shard)
val fut = memStore.recoverStream(dataset.ref, shard, stream, checkpoints, interval)
Expand All @@ -218,7 +219,7 @@ private[filodb] final class IngestionActor(dataset: Dataset,
else (off - startOffset) * 100 / (endOffset - startOffset)
logger.info(s"Recovery of dataset=${dataset.ref} shard=$shard at " +
s"$progressPct % - offset $off (target $endOffset)")
statusActor ! RecoveryInProgress(dataset.ref, shard, context.parent, progressPct.toInt)
statusActor ! RecoveryInProgress(dataset.ref, shard, nodeCoord, progressPct.toInt)
off }
.until(_ >= endOffset)
// TODO: move this code to TimeSeriesShard itself. Shard should control the thread
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package filodb.coordinator

import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.HashMap
import scala.concurrent.duration._

Expand Down Expand Up @@ -57,7 +59,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
val ingesters = new HashMap[DatasetRef, ActorRef]
val queryActors = new HashMap[DatasetRef, ActorRef]
var clusterActor: Option[ActorRef] = None
val shardMaps = new HashMap[DatasetRef, ShardMapper]
val shardMaps = new ConcurrentHashMap[DatasetRef, ShardMapper]
var statusActor: Option[ActorRef] = None

private val statusAckTimeout = config.as[FiniteDuration]("tasks.timeouts.status-ack-timeout")
Expand Down Expand Up @@ -137,7 +139,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
ingesters(ref) = ingester

logger.info(s"Creating QueryActor for dataset $ref")
val queryRef = context.actorOf(QueryActor.props(memStore, dataset, shardMaps(ref)), s"$Query-$ref")
val queryRef = context.actorOf(QueryActor.props(memStore, dataset, shardMaps.get(ref)), s"$Query-$ref")
nca.tell(SubscribeShardUpdates(ref), self)
queryActors(ref) = queryRef

Expand Down Expand Up @@ -190,7 +192,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
case NodeProtocol.ResetState => reset(sender())
case CurrentShardSnapshot(ds, mapper) =>
logger.debug(s"Received ShardSnapshot $mapper")
shardMaps(ds) = mapper
shardMaps.put(ds, mapper)
// NOTE: QueryActor has AtomicRef so no need to forward message to it
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.actor.{ActorRef, ActorSystem, Props}
import akka.dispatch.{Envelope, UnboundedStablePriorityMailbox}
import com.typesafe.config.Config
import kamon.Kamon
import monix.execution.Scheduler

import filodb.coordinator.queryengine2.QueryEngine
import filodb.core._
Expand Down Expand Up @@ -52,11 +53,12 @@ final class QueryActor(memStore: MemStore,
import QueryActor._
import client.QueryCommands._

implicit val scheduler = monix.execution.Scheduler(context.dispatcher)
val config = context.system.settings.config

val queryEngine2 = new QueryEngine(dataset, shardMapFunc)
val queryConfig = new QueryConfig(config.getConfig("filodb.query"))
val numSchedThreads = Math.ceil(config.getDouble("filodb.query.threads-factor") * sys.runtime.availableProcessors)
implicit val scheduler = Scheduler.fixedPool(s"query-${dataset.ref}", numSchedThreads.toInt)

private val tags = Map("dataset" -> dataset.ref.toString)
private val lpRequests = Kamon.counter("queryactor-logicalPlan-requests").refine(tags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ private[coordinator] final class ShardManager(settings: FilodbSettings,
// Above condition ensures that we respond to shard events only from the node shard is currently assigned to.
// Needed to avoid race conditions where IngestionStopped for an old assignment comes after shard is reassigned.
updateFromShardEvent(event)
publishSnapshot(event.ref)
// RecoveryInProgress status results in too many messages that really do not need a publish
if (!event.isInstanceOf[RecoveryInProgress]) publishSnapshot(event.ref)
// reassign shard if IngestionError. Exclude previous node since it had error shards.
event match {
case _: IngestionError =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filodb.coordinator.queryengine2

import java.util.{SplittableRandom, UUID}
import java.util.UUID
import java.util.concurrent.ThreadLocalRandom

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
Expand Down Expand Up @@ -362,10 +363,7 @@ class QueryEngine(dataset: Dataset,
val childTargets = children.map(_.dispatcher)
// Above list can contain duplicate dispatchers, and we don't make them distinct.
// Those with more shards must be weighed higher
childTargets.iterator.drop(QueryEngine.random.nextInt(childTargets.size)).next
val rnd = ThreadLocalRandom.current()
childTargets.iterator.drop(rnd.nextInt(childTargets.size)).next
}
}

object QueryEngine {
val random = new SplittableRandom()
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,33 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew
}
}

it("should parse and execute concurrent LogicalPlan queries") {
val ref = setupTimeSeries()
probe.send(coordinatorActor, IngestRows(ref, 0, records(dataset1, linearMultiSeries().take(40))))
probe.expectMsg(Ack(0L))

memStore.commitIndexForTesting(dataset1.ref)

val numQueries = 6

val series2 = (2 to 4).map(n => s"Series $n").toSet.asInstanceOf[Set[Any]]
val multiFilter = Seq(ColumnFilter("series", Filter.In(series2)))
val q2 = LogicalPlan2Query(ref,
Aggregate(AggregationOperator.Avg,
PeriodicSeries(
RawSeries(AllChunksSelector, multiFilter, Seq("min")), 120000L, 10000L, 130000L)), qOpt)
(0 until numQueries).foreach { i => probe.send(coordinatorActor, q2) }

(0 until numQueries).foreach { _ =>
probe.expectMsgPF() {
case QueryResult(_, schema, vectors) =>
schema shouldEqual timeMinSchema
vectors should have length (1)
vectors(0).rows.map(_.getDouble(1)).toSeq shouldEqual Seq(14.0, 24.0)
}
}
}

ignore("should aggregate from multiple shards") {
val ref = setupTimeSeries(2)
probe.send(coordinatorActor, IngestRows(ref, 0, records(dataset1, linearMultiSeries().take(30))))
Expand Down
37 changes: 15 additions & 22 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ filodb {

# Minimum step required for a query
min-step = 5 seconds

# Parallelism (query threadpool per dataset) ... ceil(available processors * factor)
threads-factor = 1.0
}

shard-manager {
Expand Down Expand Up @@ -243,6 +246,12 @@ akka {
"filodb.query.LogicalPlan" = kryo
}

# Reduce the number of threads used by default by the fork-join pool, as it's not really doing much work.
default-dispatcher.fork-join-executor {
parallelism-factor = 2.0
parallelism-max = 32
}

# Just the defaults to start with. TODO optimize and pick the executor needed.
shard-status-dispatcher {
type = Dispatcher
Expand Down Expand Up @@ -303,27 +312,6 @@ akka {
}
}

# Just the defaults to start with. TODO optimize and pick the executor needed.
shard-status-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}

# Be sure to terminate/exit JVM process after Akka shuts down. This is important for the
# custom downing provider's split brain resolution to work properly. Basically, the minority
# group will shut down itself and exit the process, helping to bring newer nodes online.
Expand All @@ -339,4 +327,9 @@ custom-downing {
down-if-in-minority = true
shutdown-actor-system-on-resolution = true
}
}
}

kamon.zipkin {
max.requests = 128
message.max.bytes = 262144
}
Loading

0 comments on commit b1bb604

Please sign in to comment.