diff --git a/README.md b/README.md index 1ed775acfb..4f8b95c488 100644 --- a/README.md +++ b/README.md @@ -202,10 +202,10 @@ You can also check the server logs at `logs/filodb-server-N.log`. Now run the time series generator. This will ingest 20 time series (the default) with 100 samples each into the Kafka topic with current timestamps. The required argument is the path to the source config. Use `--help` for all the options. ``` -java -cp gateway/target/scala-2.11/gateway-*-SNAPSHOT filodb.timeseries.TestTimeseriesProducer -c conf/timeseries-dev-source.conf +./dev-gateway.sh --gen-prom-data conf/timeseries-dev-source.conf ``` -NOTE: The `TestTimeseriesProducer` logs to logs/gateway-server.log. +NOTE: Check logs/gateway-server.log for logs. At this point, you should be able to confirm such a message in the server logs: `KAMON counter name=memstore-rows-ingested count=4999` @@ -220,8 +220,9 @@ You can also look at Cassandra to check for persisted data. Look at the tables i If the above does not work, try the following: 1) Delete the Kafka topic and re-create it. Note that Kafka topic deletion might not happen until the server is stopped and restarted +1a) Restart Kafka, this is sometimes necessary. 2) `./filodb-dev-stop.sh` and restart filodb instances like above -3) Re-run the `TestTimeseriesProducer`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -cp standalone/target/scala-2.11/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero. +3) Re-run `./dev-gateway.sh --gen-prom-data`. You can check consumption via running the `TestConsumer`, like this: `java -Xmx4G -Dconfig.file=conf/timeseries-filodb-server.conf -cp standalone/target/scala-2.11/standalone-assembly-0.8-SNAPSHOT.jar filodb.kafka.TestConsumer conf/timeseries-dev-source.conf`. Also, the `memstore_rows_ingested` metric which is logged to `logs/filodb-server-N.log` should become nonzero. To stop the dev server. Note that this will stop all the FiloDB servers if multiple are running. ``` @@ -308,7 +309,7 @@ Now if you curl the cluster status you should see 128 shards which are slowly tu Generate records: ``` -java -cp gateway/target/scala-2.11/gateway-*.telemetry-SNAPSHOT filodb.timeseries.TestTimeseriesProducer -c conf/timeseries-128shards-source.conf -p 5000 +./dev-gateway.sh --gen-prom-data -p 5000 conf/timeseries-128shards-source.conf ``` ## Understanding the FiloDB Data Model @@ -589,7 +590,8 @@ The `filo-cli` accepts arguments and options as key-value pairs, specified like | minutes | A shortcut to set the start at N minutes ago, and the stop at current time. Should specify a step also. | | chunks | Either "memory" or "buffers" to select either all the in-memory chunks or the write buffers only. Should specify a step also. | | database | Specifies the "database" the dataset should operate in. For Cassandra, this is the keyspace. If not specified, uses config value. | -| limit | The maximum number of samples per time series | +| limit | Limits the number of time series in the output | +| sampleLimit | Maximum number of output samples in the query result. An exception is thrown if the output returns more results than this. | | shards | (EXPERT) overrides the automatic shard calculation by passing in a comma-separated list of specific shards to query. Very useful to debug sharding issues. | | everyNSeconds | Repeats the query every (argument) seconds | | timeoutSeconds | The number of seconds for the network timeout | diff --git a/cli/src/main/scala/filodb.cli/CliMain.scala b/cli/src/main/scala/filodb.cli/CliMain.scala index 790909cfb4..47b153b2f7 100644 --- a/cli/src/main/scala/filodb.cli/CliMain.scala +++ b/cli/src/main/scala/filodb.cli/CliMain.scala @@ -34,9 +34,9 @@ class Arguments extends FieldArgs { var rowKeys: Seq[String] = Seq("timestamp") var partitionKeys: Seq[String] = Nil var select: Option[Seq[String]] = None - // max # query items (vectors or tuples) returned. Don't make it too high. - var limit: Int = 1000 - var sampleLimit: Int = 200 + // max # of RangeVectors returned. Don't make it too high. + var limit: Int = 200 + var sampleLimit: Int = 1000000 var timeoutSeconds: Int = 60 var outfile: Option[String] = None var delimiter: String = "," @@ -364,7 +364,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste def executeQuery2(client: LocalClient, dataset: String, plan: LogicalPlan, options: QOptions): Unit = { val ref = DatasetRef(dataset) - val qOpts = QueryCommands.QueryOptions(options.spread, options.limit) + val qOpts = QueryCommands.QueryOptions(options.spread, options.sampleLimit) .copy(queryTimeoutSecs = options.timeout.toSeconds.toInt, shardOverrides = options.shardOverrides) println(s"Sending query command to server for $ref with options $qOpts...") @@ -373,7 +373,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste case Some(intervalSecs) => val fut = Observable.intervalAtFixedRate(intervalSecs.seconds).foreach { n => client.logicalPlan2Query(ref, plan, qOpts) match { - case QueryResult(_, schema, result) => result.foreach(rv => println(rv.prettyPrint())) + case QueryResult(_, schema, result) => result.take(options.limit).foreach(rv => println(rv.prettyPrint())) case err: QueryError => throw new ClientException(err) } }.recover { @@ -385,7 +385,7 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste try { client.logicalPlan2Query(ref, plan, qOpts) match { case QueryResult(_, schema, result) => println(s"Number of Range Vectors: ${result.size}") - result.foreach(rv => println(rv.prettyPrint())) + result.take(options.limit).foreach(rv => println(rv.prettyPrint())) case QueryError(_,ex) => println(s"QueryError: ${ex.getClass.getSimpleName} ${ex.getMessage}") } } catch { diff --git a/conf/histogram-dev-source.conf b/conf/histogram-dev-source.conf new file mode 100644 index 0000000000..51b7c6e39c --- /dev/null +++ b/conf/histogram-dev-source.conf @@ -0,0 +1,57 @@ + dataset = "histogram" + num-shards = 4 + min-num-nodes = 2 + sourcefactory = "filodb.kafka.KafkaIngestionStreamFactory" + + sourceconfig { + # Required FiloDB configurations + filo-topic-name = "histogram-dev" + + # Standard kafka configurations, e.g. + # This accepts both the standard kafka value of a comma-separated + # string and a Typesafe list of String values + # EXCEPT: do not populate value.deserializer, as the Kafka format is fixed in FiloDB to be messages of RecordContainer's + bootstrap.servers = "localhost:9092" + group.id = "filo-db-histogram-ingestion" + + # Values controlling in-memory store chunking, flushing, etc. + store { + # Interval it takes to flush ALL time series in a shard. This time is further divided by groups-per-shard + flush-interval = 1h + + # TTL for on-disk / C* data. Data older than this may be purged. + disk-time-to-live = 24 hours + + # amount of time paged chunks should be retained in memory. + # We need to have a minimum of x hours free blocks or else init won't work. + demand-paged-chunk-retention-period = 12 hours + + max-chunks-size = 400 + + # Write buffer size, in bytes, for blob columns (histograms, UTF8Strings). Since these are variable data types, + # we need a maximum size, not a maximum number of items. + max-blob-buffer-size = 15000 + + # Number of bytes of offheap mem to allocate to chunk storage in each shard. Ex. 1000MB, 1G, 2GB + # Assume 5 bytes per sample, should be roughly equal to (# samples per time series) * (# time series) + shard-mem-size = 512MB + + # Number of bytes of offheap mem to allocate to write buffers in each shard. Ex. 1000MB, 1G, 2GB + # Scales with the number of time series a shard should hold + ingestion-buffer-mem-size = 50MB + + # Number of time series to evict at a time. + # num-partitions-to-evict = 1000 + + # Number of subgroups within each shard. Persistence to a ChunkSink occurs one subgroup at a time, as does + # recovery from failure. This many batches of flushes must occur to cover persistence of every partition + groups-per-shard = 20 + + # Use a "MultiPartitionScan" or Cassandra MULTIGET for on-demand paging. Might improve performance. + multi-partition-odp = false + } + downsample { + # can be disabled by setting this flag to false + enabled = false + } + } \ No newline at end of file diff --git a/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala b/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala index 50cc0e09d2..228225249c 100644 --- a/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala @@ -60,6 +60,7 @@ private[filodb] final class IngestionActor(dataset: Dataset, final val streamSubscriptions = new HashMap[Int, CancelableFuture[Unit]] final val streams = new HashMap[Int, IngestionStream] + final val nodeCoord = context.parent // Params for creating the default memStore flush scheduler private final val numGroups = storeConfig.groupsPerShard @@ -164,7 +165,7 @@ private[filodb] final class IngestionActor(dataset: Dataset, create(shard, offset) map { ingestionStream => val stream = ingestionStream.get logger.info(s"Starting normal/active ingestion for dataset=${dataset.ref} shard=$shard at offset $offset") - statusActor ! IngestionStarted(dataset.ref, shard, context.parent) + statusActor ! IngestionStarted(dataset.ref, shard, nodeCoord) streamSubscriptions(shard) = memStore.ingestStream(dataset.ref, shard, @@ -209,7 +210,7 @@ private[filodb] final class IngestionActor(dataset: Dataset, .withTag("shard", shard.toString) .withTag("dataset", dataset.ref.toString).start() val stream = ingestionStream.get - statusActor ! RecoveryInProgress(dataset.ref, shard, context.parent, 0) + statusActor ! RecoveryInProgress(dataset.ref, shard, nodeCoord, 0) val shardInstance = memStore.asInstanceOf[TimeSeriesMemStore].getShardE(dataset.ref, shard) val fut = memStore.recoverStream(dataset.ref, shard, stream, checkpoints, interval) @@ -218,7 +219,7 @@ private[filodb] final class IngestionActor(dataset: Dataset, else (off - startOffset) * 100 / (endOffset - startOffset) logger.info(s"Recovery of dataset=${dataset.ref} shard=$shard at " + s"$progressPct % - offset $off (target $endOffset)") - statusActor ! RecoveryInProgress(dataset.ref, shard, context.parent, progressPct.toInt) + statusActor ! RecoveryInProgress(dataset.ref, shard, nodeCoord, progressPct.toInt) off } .until(_ >= endOffset) // TODO: move this code to TimeSeriesShard itself. Shard should control the thread diff --git a/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala b/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala index d96e20fe65..a69291a6c1 100644 --- a/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala @@ -1,5 +1,7 @@ package filodb.coordinator +import java.util.concurrent.ConcurrentHashMap + import scala.collection.mutable.HashMap import scala.concurrent.duration._ @@ -57,7 +59,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, val ingesters = new HashMap[DatasetRef, ActorRef] val queryActors = new HashMap[DatasetRef, ActorRef] var clusterActor: Option[ActorRef] = None - val shardMaps = new HashMap[DatasetRef, ShardMapper] + val shardMaps = new ConcurrentHashMap[DatasetRef, ShardMapper] var statusActor: Option[ActorRef] = None private val statusAckTimeout = config.as[FiniteDuration]("tasks.timeouts.status-ack-timeout") @@ -137,7 +139,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, ingesters(ref) = ingester logger.info(s"Creating QueryActor for dataset $ref") - val queryRef = context.actorOf(QueryActor.props(memStore, dataset, shardMaps(ref)), s"$Query-$ref") + val queryRef = context.actorOf(QueryActor.props(memStore, dataset, shardMaps.get(ref)), s"$Query-$ref") nca.tell(SubscribeShardUpdates(ref), self) queryActors(ref) = queryRef @@ -190,7 +192,7 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, case NodeProtocol.ResetState => reset(sender()) case CurrentShardSnapshot(ds, mapper) => logger.debug(s"Received ShardSnapshot $mapper") - shardMaps(ds) = mapper + shardMaps.put(ds, mapper) // NOTE: QueryActor has AtomicRef so no need to forward message to it } diff --git a/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala b/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala index 7f458dc617..d01bb625e6 100644 --- a/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/QueryActor.scala @@ -8,6 +8,7 @@ import akka.actor.{ActorRef, ActorSystem, Props} import akka.dispatch.{Envelope, UnboundedStablePriorityMailbox} import com.typesafe.config.Config import kamon.Kamon +import monix.execution.Scheduler import filodb.coordinator.queryengine2.QueryEngine import filodb.core._ @@ -52,11 +53,12 @@ final class QueryActor(memStore: MemStore, import QueryActor._ import client.QueryCommands._ - implicit val scheduler = monix.execution.Scheduler(context.dispatcher) val config = context.system.settings.config val queryEngine2 = new QueryEngine(dataset, shardMapFunc) val queryConfig = new QueryConfig(config.getConfig("filodb.query")) + val numSchedThreads = Math.ceil(config.getDouble("filodb.query.threads-factor") * sys.runtime.availableProcessors) + implicit val scheduler = Scheduler.fixedPool(s"query-${dataset.ref}", numSchedThreads.toInt) private val tags = Map("dataset" -> dataset.ref.toString) private val lpRequests = Kamon.counter("queryactor-logicalPlan-requests").refine(tags) diff --git a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala index b846929831..6ea9afa123 100644 --- a/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala +++ b/coordinator/src/main/scala/filodb.coordinator/ShardManager.scala @@ -449,7 +449,8 @@ private[coordinator] final class ShardManager(settings: FilodbSettings, // Above condition ensures that we respond to shard events only from the node shard is currently assigned to. // Needed to avoid race conditions where IngestionStopped for an old assignment comes after shard is reassigned. updateFromShardEvent(event) - publishSnapshot(event.ref) + // RecoveryInProgress status results in too many messages that really do not need a publish + if (!event.isInstanceOf[RecoveryInProgress]) publishSnapshot(event.ref) // reassign shard if IngestionError. Exclude previous node since it had error shards. event match { case _: IngestionError => diff --git a/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala b/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala index 5b5ffa1a99..d003c9eda7 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryengine2/QueryEngine.scala @@ -1,6 +1,7 @@ package filodb.coordinator.queryengine2 -import java.util.{SplittableRandom, UUID} +import java.util.UUID +import java.util.concurrent.ThreadLocalRandom import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration @@ -362,10 +363,7 @@ class QueryEngine(dataset: Dataset, val childTargets = children.map(_.dispatcher) // Above list can contain duplicate dispatchers, and we don't make them distinct. // Those with more shards must be weighed higher - childTargets.iterator.drop(QueryEngine.random.nextInt(childTargets.size)).next + val rnd = ThreadLocalRandom.current() + childTargets.iterator.drop(rnd.nextInt(childTargets.size)).next } } - -object QueryEngine { - val random = new SplittableRandom() -} diff --git a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala index e2369a1010..011ea417d4 100644 --- a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala @@ -238,6 +238,33 @@ class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNew } } + it("should parse and execute concurrent LogicalPlan queries") { + val ref = setupTimeSeries() + probe.send(coordinatorActor, IngestRows(ref, 0, records(dataset1, linearMultiSeries().take(40)))) + probe.expectMsg(Ack(0L)) + + memStore.commitIndexForTesting(dataset1.ref) + + val numQueries = 6 + + val series2 = (2 to 4).map(n => s"Series $n").toSet.asInstanceOf[Set[Any]] + val multiFilter = Seq(ColumnFilter("series", Filter.In(series2))) + val q2 = LogicalPlan2Query(ref, + Aggregate(AggregationOperator.Avg, + PeriodicSeries( + RawSeries(AllChunksSelector, multiFilter, Seq("min")), 120000L, 10000L, 130000L)), qOpt) + (0 until numQueries).foreach { i => probe.send(coordinatorActor, q2) } + + (0 until numQueries).foreach { _ => + probe.expectMsgPF() { + case QueryResult(_, schema, vectors) => + schema shouldEqual timeMinSchema + vectors should have length (1) + vectors(0).rows.map(_.getDouble(1)).toSeq shouldEqual Seq(14.0, 24.0) + } + } + } + ignore("should aggregate from multiple shards") { val ref = setupTimeSeries(2) probe.send(coordinatorActor, IngestRows(ref, 0, records(dataset1, linearMultiSeries().take(30)))) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index d4f72108bf..3b24108270 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -38,6 +38,9 @@ filodb { # Minimum step required for a query min-step = 5 seconds + + # Parallelism (query threadpool per dataset) ... ceil(available processors * factor) + threads-factor = 1.0 } shard-manager { @@ -243,6 +246,12 @@ akka { "filodb.query.LogicalPlan" = kryo } + # Reduce the number of threads used by default by the fork-join pool, as it's not really doing much work. + default-dispatcher.fork-join-executor { + parallelism-factor = 2.0 + parallelism-max = 32 + } + # Just the defaults to start with. TODO optimize and pick the executor needed. shard-status-dispatcher { type = Dispatcher @@ -303,27 +312,6 @@ akka { } } - # Just the defaults to start with. TODO optimize and pick the executor needed. - shard-status-dispatcher { - # Dispatcher is the name of the event-based dispatcher - type = Dispatcher - # What kind of ExecutionService to use - executor = "fork-join-executor" - # Configuration for the fork join pool - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 2 - # Parallelism (threads) ... ceil(available processors * factor) - parallelism-factor = 2.0 - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 10 - } - # Throughput defines the maximum number of messages to be - # processed per actor before the thread jumps to the next actor. - # Set to 1 for as fair as possible. - throughput = 100 - } - # Be sure to terminate/exit JVM process after Akka shuts down. This is important for the # custom downing provider's split brain resolution to work properly. Basically, the minority # group will shut down itself and exit the process, helping to bring newer nodes online. @@ -339,4 +327,9 @@ custom-downing { down-if-in-minority = true shutdown-actor-system-on-resolution = true } -} \ No newline at end of file +} + +kamon.zipkin { + max.requests = 128 + message.max.bytes = 262144 +} diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 75777a42ee..e32073b1b1 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -17,6 +17,7 @@ import monix.eval.Task import monix.execution.{Scheduler, UncaughtExceptionReporter} import monix.execution.atomic.AtomicBoolean import monix.reactive.Observable +import org.apache.lucene.util.BytesRef import org.jctools.maps.NonBlockingHashMapLong import scalaxy.loops._ @@ -429,16 +430,23 @@ class TimeSeriesShard(val dataset: Dataset, .withTag("dataset", dataset.name) .withTag("shard", shardNum).start() + /* We need this map to track partKey->partId because lucene index cannot be looked up + using partKey efficiently, and more importantly, it is eventually consistent. + The map and contents will be garbage collected after we are done with recovery */ + val partIdMap = debox.Map.empty[BytesRef, Int] + val earliestTimeBucket = Math.max(0, currentIndexTimeBucket - numTimeBucketsToRetain) logger.info(s"Recovering timebuckets $earliestTimeBucket until $currentIndexTimeBucket " + s"for dataset=${dataset.ref} shard=$shardNum ") - val timeBuckets = for { tb <- earliestTimeBucket until currentIndexTimeBucket } yield { + // go through the buckets in reverse order to first one wins and we need not rewrite + // entries in lucene + val timeBuckets = for { tb <- currentIndexTimeBucket until earliestTimeBucket by -1 } yield { colStore.getPartKeyTimeBucket(dataset, shardNum, tb).map { b => new IndexData(tb, b.segmentId, RecordContainer(b.segment.array())) } } val fut = Observable.flatten(timeBuckets: _*) - .foreach(tb => extractTimeBucket(tb))(ingestSched) + .foreach(tb => extractTimeBucket(tb, partIdMap))(ingestSched) .map(_ => completeIndexRecovery()) fut.onComplete(_ => tracer.finish()) fut @@ -450,7 +458,8 @@ class TimeSeriesShard(val dataset: Dataset, logger.info(s"Bootstrapped index for dataset=${dataset.ref} shard=$shardNum") } - private[memstore] def extractTimeBucket(segment: IndexData): Unit = { + // scalastyle:off method.length + private[memstore] def extractTimeBucket(segment: IndexData, partIdMap: debox.Map[BytesRef, Int]): Unit = { var numRecordsProcessed = 0 segment.records.iterate(indexTimeBucketSchema).foreach { row => // read binary record and extract the indexable data fields @@ -459,44 +468,63 @@ class TimeSeriesShard(val dataset: Dataset, val partKeyBaseOnHeap = row.getBlobBase(2).asInstanceOf[Array[Byte]] val partKeyOffset = row.getBlobOffset(2) val partKeyNumBytes = row.getBlobNumBytes(2) + val partKeyBytesRef = new BytesRef(partKeyBaseOnHeap, + PartKeyLuceneIndex.unsafeOffsetToBytesRefOffset(partKeyOffset), + partKeyNumBytes) - // look up partId in partSet if it already exists before assigning new partId. + // look up partKey in partIdMap if it already exists before assigning new partId. // We cant look it up in lucene because we havent flushed index yet - val partId = partSet.getWithPartKeyBR(partKeyBaseOnHeap, partKeyOffset) match { - case None => val group = partKeyGroup(dataset.partKeySchema, partKeyBaseOnHeap, partKeyOffset, numGroups) + if (partIdMap.get(partKeyBytesRef).isEmpty) { + val partId = if (endTime == Long.MaxValue) { + // this is an actively ingesting partition + val group = partKeyGroup(dataset.partKeySchema, partKeyBaseOnHeap, partKeyOffset, numGroups) val part = createNewPartition(partKeyBaseOnHeap, partKeyOffset, group, CREATE_NEW_PARTID, 4) // In theory, we should not get an OutOfMemPartition here since - // it should have occurred before node failed too, and with data sropped, + // it should have occurred before node failed too, and with data stopped, // index would not be updated. But if for some reason we see it, drop data if (part == OutOfMemPartition) { logger.error("Could not accommodate partKey while recovering index. " + "WriteBuffer size may not be configured correctly") - -1 + None } else { val stamp = partSetLock.writeLock() try { partSet.add(part) // createNewPartition doesn't add part to partSet + Some(part.partID) } finally { partSetLock.unlockWrite(stamp) } - part.partID } - case Some(p) => p.partID - } - if (partId != -1) { - // upsert into lucene since there can be multiple records for one partKey, and most recent wins. - partKeyIndex.upsertPartKey(partKeyBaseOnHeap, partId, startTime, endTime, - PartKeyLuceneIndex.unsafeOffsetToBytesRefOffset(partKeyOffset))(partKeyNumBytes) - timeBucketBitmaps.get(segment.timeBucket).set(partId) - activelyIngesting.synchronized { - if (endTime == Long.MaxValue) activelyIngesting.set(partId) else activelyIngesting.clear(partId) + } else { + // partition assign a new partId to non-ingesting partition, + // but no need to create a new TSPartition heap object + val id = nextPartitionID + incrementPartitionID() + Some(id) } + + // add newly assigned partId to lucene index + partId.foreach { partId => + partIdMap(partKeyBytesRef) = partId + partKeyIndex.addPartKey(partKeyBaseOnHeap, partId, startTime, endTime, + PartKeyLuceneIndex.unsafeOffsetToBytesRefOffset(partKeyOffset))(partKeyNumBytes) + timeBucketBitmaps.get(segment.timeBucket).set(partId) + activelyIngesting.synchronized { + if (endTime == Long.MaxValue) activelyIngesting.set(partId) + else activelyIngesting.clear(partId) + } + } + } else { + // partId has already been assigned for this partKey because we previously processed a later record in time. + // Time buckets are processed in reverse order, and given last one wins and is used for index, + // we skip this record and move on. } numRecordsProcessed += 1 } shardStats.indexRecoveryNumRecordsProcessed.increment(numRecordsProcessed) - logger.info(s"Recovered partition keys from timebucket for dataset=${dataset.ref} shard=$shardNum" + - s" timebucket=${segment.timeBucket} segment=${segment.segment} numRecordsProcessed=$numRecordsProcessed") + logger.info(s"Recovered partKeys for dataset=${dataset.ref} shard=$shardNum" + + s" timebucket=${segment.timeBucket} segment=${segment.segment} numRecordsInBucket=$numRecordsProcessed" + + s" numPartsInIndex=${partIdMap.size} numIngestingParts=${partitions.size}") } def indexNames: Iterator[String] = partKeyIndex.indexNames @@ -902,7 +930,6 @@ class TimeSeriesShard(val dataset: Dataset, activelyIngesting.clear(p.partID) } else if (partFlushChunks.nonEmpty && !activelyIngesting.get(p.partID)) { // Partition started re-ingesting. - // TODO: we can do better than this for intermittent time series. Address later. updatePartEndTimeInIndex(p, Long.MaxValue) timeBucketBitmaps.get(timeBucket).set(p.partID) activelyIngesting.set(p.partID) diff --git a/core/src/main/scala/filodb.core/metadata/Dataset.scala b/core/src/main/scala/filodb.core/metadata/Dataset.scala index b7725dfc69..decc5629d2 100644 --- a/core/src/main/scala/filodb.core/metadata/Dataset.scala +++ b/core/src/main/scala/filodb.core/metadata/Dataset.scala @@ -12,7 +12,7 @@ import filodb.core.downsample.ChunkDownsampler import filodb.core.query.ColumnInfo import filodb.core.store.ChunkSetInfo import filodb.memory.{BinaryRegion, MemFactory} -import filodb.memory.format.{BinaryVector, RowReader, TypedIterator} +import filodb.memory.format.{BinaryVector, RowReader, TypedIterator, ZeroCopyUTF8String => ZCUTF8} /** * A dataset describes the schema (column name & type) and distribution for a stream/set of data. @@ -169,6 +169,7 @@ case class DatasetOptions(shardKeyColumns: Seq[String], val nonMetricShardColumns = shardKeyColumns.filterNot(_ == metricColumn).sorted val nonMetricShardKeyBytes = nonMetricShardColumns.map(_.getBytes).toArray + val nonMetricShardKeyUTF8 = nonMetricShardColumns.map(ZCUTF8.apply).toArray val nonMetricShardKeyHash = nonMetricShardKeyBytes.map(BinaryRegion.hash32) val ignorePartKeyHashTags = ignoreTagsOnPartitionKeyHash.toSet diff --git a/core/src/main/scala/filodb.core/zipkin/Zipkin.scala b/core/src/main/scala/filodb.core/zipkin/Zipkin.scala index 3ccc721a1d..b68bf2d85b 100644 --- a/core/src/main/scala/filodb.core/zipkin/Zipkin.scala +++ b/core/src/main/scala/filodb.core/zipkin/Zipkin.scala @@ -139,8 +139,8 @@ class ZipkinReporter extends SpanReporter { OkHttpSender.newBuilder() .encoding(Encoding.JSON) .endpoint(url) - .maxRequests(if (maxRequests > 0) maxRequests else 128) - .messageMaxBytes(if (messageMaxBytes > 0) messageMaxBytes else 1024 * 256) + .maxRequests(maxRequests) + .messageMaxBytes(messageMaxBytes) .build() ) } diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala index c7c273bfd0..a59f323bf5 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala @@ -6,6 +6,7 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import monix.execution.ExecutionModel.BatchedExecution import monix.reactive.Observable +import org.apache.lucene.util.BytesRef import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} @@ -85,7 +86,7 @@ class TimeSeriesMemStoreSpec extends FunSpec with Matchers with BeforeAndAfter w memStore.commitIndexForTesting(dataset1.ref) val split = memStore.getScanSplits(dataset1.ref, 1).head val agg1 = memStore.scanRows(dataset1, Seq(1), FilteredPartitionScan(split)).map(_.getDouble(0)).sum - agg1 shouldEqual ((1 to 20).map(_.toDouble).sum) + agg1 shouldEqual (1 to 20).map(_.toDouble).sum } it("should ingest map/tags column as partition key and aggregate") { @@ -272,6 +273,9 @@ class TimeSeriesMemStoreSpec extends FunSpec with Matchers with BeforeAndAfter w tsShard.timeBucketBitmaps.keySet.asScala.toSeq.sorted shouldEqual 19.to(25) // 6 buckets retained + one for current } + /** + * Tries to write partKeys into time bucket record container and extracts them back into the shard + */ def indexRecoveryTest(dataset: Dataset, partKeys: Seq[Long]): Unit = { memStore.metastore.writeHighestIndexTimeBucket(dataset.ref, 0, 0) memStore.setup(dataset, 0, @@ -282,24 +286,33 @@ class TimeSeriesMemStoreSpec extends FunSpec with Matchers with BeforeAndAfter w partKeys.zipWithIndex.foreach { case (off, i) => timeBucketRb.startNewRecord() - timeBucketRb.addLong(i + 10) - timeBucketRb.addLong(i + 20) + timeBucketRb.addLong(i + 10) // startTime + timeBucketRb.addLong(if (i%2 == 0) Long.MaxValue else i + 20) // endTime val numBytes = BinaryRegionLarge.numBytes(UnsafeUtils.ZeroPointer, off) - timeBucketRb.addBlob(UnsafeUtils.ZeroPointer, off, numBytes + 4) + timeBucketRb.addBlob(UnsafeUtils.ZeroPointer, off, numBytes + 4) // partKey timeBucketRb.endRecord(false) } tsShard.initTimeBuckets() + val partIdMap = debox.Map.empty[BytesRef, Int] + timeBucketRb.optimalContainerBytes(true).foreach { bytes => - tsShard.extractTimeBucket(new IndexData(1, 0, RecordContainer(bytes))) + tsShard.extractTimeBucket(new IndexData(1, 0, RecordContainer(bytes)), partIdMap) } tsShard.commitPartKeyIndexBlocking() + partIdMap.size shouldEqual partKeys.size partKeys.zipWithIndex.foreach { case (off, i) => val readPartKey = tsShard.partKeyIndex.partKeyFromPartId(i).get val expectedPartKey = dataset1.partKeySchema.asByteArray(UnsafeUtils.ZeroPointer, off) - readPartKey.bytes.drop(readPartKey.offset).take(readPartKey.length) shouldEqual expectedPartKey - tsShard.partitions.get(i).partKeyBytes shouldEqual expectedPartKey - tsShard.partSet.getWithPartKeyBR(UnsafeUtils.ZeroPointer, off).get.partID shouldEqual i + readPartKey.bytes.slice(readPartKey.offset, readPartKey.offset + readPartKey.length) shouldEqual expectedPartKey + if (i%2 == 0) { + tsShard.partSet.getWithPartKeyBR(UnsafeUtils.ZeroPointer, off).get.partID shouldEqual i + tsShard.partitions.containsKey(i) shouldEqual true // since partition is ingesting + } + else { + tsShard.partSet.getWithPartKeyBR(UnsafeUtils.ZeroPointer, off) shouldEqual None + tsShard.partitions.containsKey(i) shouldEqual false // since partition is not ingesting + } } } diff --git a/dev-gateway.sh b/dev-gateway.sh index f4d4d45764..bfa98bd9f4 100755 --- a/dev-gateway.sh +++ b/dev-gateway.sh @@ -1,2 +1,8 @@ #!/usr/bin/env bash -java -cp gateway/target/scala-2.11/gateway-* filodb.gateway.GatewayServer conf/timeseries-dev-source.conf & \ No newline at end of file +# +# Starts a local Gateway for ingesting data into FiloDB (run with no options) +# Type --help to see options - options include generating random test data and exiting. +args=${@:-"conf/timeseries-dev-source.conf"} +java -Dconfig.file=conf/timeseries-filodb-server.conf \ + -Dkamon.prometheus.embedded-server.port=9097 \ + -cp gateway/target/scala-2.11/gateway-* filodb.gateway.GatewayServer $args & \ No newline at end of file diff --git a/gateway/src/main/scala/filodb/gateway/GatewayServer.scala b/gateway/src/main/scala/filodb/gateway/GatewayServer.scala index e730b9ddd8..49e5f6712e 100644 --- a/gateway/src/main/scala/filodb/gateway/GatewayServer.scala +++ b/gateway/src/main/scala/filodb/gateway/GatewayServer.scala @@ -4,8 +4,8 @@ import java.net.InetSocketAddress import java.nio.charset.Charset import java.util.concurrent.Executors +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ -import scala.concurrent.Future import scala.util.control.NonFatal import com.typesafe.config.{Config, ConfigFactory} @@ -25,25 +25,42 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.ssl.SslContext import org.jboss.netty.handler.ssl.util.SelfSignedCertificate import org.jctools.queues.MpscGrowableArrayQueue +import org.rogach.scallop._ -import filodb.coordinator.{GlobalConfig, ShardMapper} +import filodb.coordinator.{FilodbSettings, GlobalConfig, ShardMapper, StoreFactory} import filodb.core.binaryrecord2.RecordBuilder import filodb.core.metadata.Dataset import filodb.gateway.conversion._ import filodb.memory.MemFactory -import filodb.prometheus.FormatConversion +import filodb.timeseries.TestTimeseriesProducer /** * Gateway server to ingest source streams of data, shard, batch, and write output to Kafka * built using high performance Netty TCP code * - * It takes exactly one arg: the source config file which contains # Kafka partitions/shards and other config - * Also pass in -Dconfig.file=.... as usual + * It usually takes one arg: the source config file which contains # Kafka partitions/shards and other config + * Also pass in -Dconfig.file=.... as usual, with a config that points to the dataset metadata. + * For local setups, simply run `./dev-gateway.sh`. + * For help pass in `--help`. + * + * NOTE: set `kamon.prometheus.embedded-server.port` to avoid conflicting with FiloDB itself. + * + * There are options that can be used to generate test data, such as `--gen-hist-data`. The -n and -p options can + * also be used together to control the # of samples per series and # of time series. + * To generate Histogram schema test data, one must create the following dataset: + * ./filo-cli -Dconfig.file=conf/timeseries-filodb-server.conf --command create --dataset histogram \ + * --dataColumns timestamp:ts,sum:long,count:long,h:hist --partitionColumns metric:string,tags:map \ + * --shardKeyColumns metric --metricColumn metric + * create a Kafka topic: + * kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic histogram-dev + * and use the `conf/histogram-dev-source.conf` config file. + * Oh, and you have to observe on shards 1 and 3. */ object GatewayServer extends StrictLogging { // Get global configuration using universal FiloDB/Akka-based config val config = GlobalConfig.systemConfig + val storeFactory = StoreFactory(new FilodbSettings(config), Scheduler.io()) // ==== Metrics ==== val numInfluxMessages = Kamon.counter("num-influx-messages") @@ -52,21 +69,29 @@ object GatewayServer extends StrictLogging { val numContainersSent = Kamon.counter("num-containers-sent") val containersSize = Kamon.histogram("containers-size-bytes") + // Most options are for generating test data + class GatewayOptions(args: Seq[String]) extends ScallopConf(args) { + val samplesPerSeries = opt[Int](short = 'n', default = Some(100), + descr="# of samples per time series") + val numSeries = opt[Int](short='p', default = Some(20), descr="# of total time series") + val sourceConfigPath = trailArg[String](descr="Path to source config, eg conf/timeseries-dev-source.conf") + val genHistData = toggle(noshort=true, descrYes="Generate histogram-schema test data and exit") + val genPromData = toggle(noshort=true, descrYes="Generate Prometheus-schema test data and exit") + verify() + } + + //scalastyle:off method.length def main(args: Array[String]): Unit = { Kamon.loadReportersFromConfig() + val userOpts = new GatewayOptions(args) + val numSamples = userOpts.samplesPerSeries() * userOpts.numSeries() + val numSeries = userOpts.numSeries() - if (args.length < 1) { - //scalastyle:off - println("Arguments: [path/to/source-config.conf]") - //scalastyle:on - sys.exit(1) - } - - val sourceConfig = ConfigFactory.parseFile(new java.io.File(args.head)) + val sourceConfig = ConfigFactory.parseFile(new java.io.File(userOpts.sourceConfigPath())) val numShards = sourceConfig.getInt("num-shards") - // TODO: get the dataset from source config and read the definition from Metastore - val dataset = FormatConversion.dataset + val datasetStr = sourceConfig.getString("dataset") + val dataset = Await.result(storeFactory.metaStore.getDataset(datasetStr), 30.seconds) // NOTE: the spread MUST match the default spread used in the HTTP module for consistency between querying // and ingestion sharding @@ -97,8 +122,33 @@ object GatewayServer extends StrictLogging { // TODO: allow configurable sinks, maybe multiple sinks for say writing to multiple Kafka clusters/DCs setupKafkaProducer(sourceConfig, containerStream) - setupTCPService(config, calcShardAndQueueHandler) + + val genHist = userOpts.genHistData.getOrElse(false) + val genProm = userOpts.genPromData.getOrElse(false) + if (genHist || genProm) { + val startTime = System.currentTimeMillis + logger.info(s"Generating $numSamples samples starting at $startTime....") + + val stream = if (genHist) TestTimeseriesProducer.genHistogramData(startTime, dataset, numSeries) + else TestTimeseriesProducer.timeSeriesData(startTime, numSeries) + + stream.take(numSamples).foreach { rec => + val shard = shardMapper.ingestionShard(rec.shardKeyHash, rec.partitionKeyHash, spread) + if (!shardQueues(shard).offer(rec)) { + // Prioritize recent data. This means dropping messages when full, so new data may have a chance. + logger.warn(s"Queue for shard=$shard is full. Dropping data.") + numDroppedMessages.increment + } + } + Thread sleep 10000 + TestTimeseriesProducer.logQueryHelp(numSamples, numSeries, startTime) + logger.info(s"Waited for containers to be sent, exiting...") + sys.exit(0) + } else { + setupTCPService(config, calcShardAndQueueHandler) + } } + //scalastyle:on method.length def setupTCPService(config: Config, handler: ChannelBuffer => Unit): Unit = { val influxPort = config.getInt("gateway.influx-port") diff --git a/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala b/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala index c6107ca38d..b31afdcaf8 100644 --- a/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala +++ b/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala @@ -7,6 +7,7 @@ import scalaxy.loops._ import filodb.core.binaryrecord2.RecordBuilder import filodb.core.metadata.Dataset +import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String => ZCUTF8} /** * An InputRecord represents one "record" of timeseries data for input to FiloDB system. @@ -124,4 +125,34 @@ object PrometheusInputRecord { } tags ++ extraTags } -} \ No newline at end of file +} + +/** + * A generic InputRecord that can serve different data schemas, so long as the partition key consists of: + * - a single metric StringColumn + * - a MapColumn of tags + * + * Can be used to adapt custom dataset/schemas for input into FiloDB using the gateway. + * Not going to be the fastest InputRecord but extremely flexible. + * + * @param values the data column values, first one is probably timestamp + */ +class MetricTagInputRecord(values: Seq[Any], + metric: String, + tags: Map[ZCUTF8, ZCUTF8], + dataset: Dataset) extends InputRecord { + final def shardKeyHash: Int = RecordBuilder.shardKeyHash(nonMetricShardValues, metric) + // NOTE: this is probably not very performant right now. + final def partitionKeyHash: Int = tags.hashCode + + val nonMetricShardValues: Seq[String] = + dataset.options.nonMetricShardKeyUTF8.flatMap(tags.get).map(_.toString).toSeq + final def getMetric: String = metric + + def addToBuilder(builder: RecordBuilder): Unit = { + require(builder.schema == dataset.ingestionSchema) + + val reader = SeqRowReader(values :+ metric :+ tags) + builder.addFromReader(reader) + } +} diff --git a/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala b/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala index ddcfba0c7d..32ce1f7a1c 100644 --- a/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala +++ b/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala @@ -3,67 +3,33 @@ package filodb.timeseries import java.net.URLEncoder import java.nio.charset.StandardCharsets -import scala.concurrent.{Await, Future} +import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Random -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.Config import com.typesafe.scalalogging.StrictLogging import monix.reactive.Observable -import org.rogach.scallop._ import filodb.coordinator.{GlobalConfig, ShardMapper} -import filodb.core.metadata.Dataset +import filodb.core.metadata.{Column, Dataset} import filodb.gateway.GatewayServer -import filodb.gateway.conversion.PrometheusInputRecord +import filodb.gateway.conversion.{InputRecord, MetricTagInputRecord, PrometheusInputRecord} +import filodb.memory.format.{vectors => bv, ZeroCopyUTF8String => ZCUTF8} import filodb.prometheus.FormatConversion -sealed trait DataOrCommand -final case class DataSample(tags: Map[String, String], - metric: String, - timestamp: Long, - value: Double) extends DataOrCommand -case object FlushCommand extends DataOrCommand - /** - * Simple driver to produce time series data into local kafka similar. Data format is similar to - * prometheus metric sample. - * This is for development testing purposes only. TODO: Later evolve this to accept prometheus formats. - * - * Run as `java -cp classpath filodb.timeseries.TestTimeseriesProducer --help` - * + * Utilities to produce time series data into local Kafka for development testing. + * Please see GatewayServer for the app to run, or README for docs. */ object TestTimeseriesProducer extends StrictLogging { val dataset = FormatConversion.dataset - class ProducerOptions(args: Seq[String]) extends ScallopConf(args) { - val samplesPerSeries = opt[Int](short = 'n', default = Some(100), - descr="# of samples per time series") - val startMinutesAgo = opt[Int](short='t') - val numSeries = opt[Int](short='p', default = Some(20), descr="# of total time series") - val sourceConfigPath = opt[String](required = true, short = 'c', - descr="Path to source conf file eg conf/timeseries-dev-source.conf") - verify() - } - val oneBitMask = 0x1 val twoBitMask = 0x3 val rand = Random // start from a random day in the last 5 years - def main(args: Array[String]): Unit = { - val conf = new ProducerOptions(args) - val sourceConfig = ConfigFactory.parseFile(new java.io.File(conf.sourceConfigPath())) - val numSamples = conf.samplesPerSeries() * conf.numSeries() - val numTimeSeries = conf.numSeries() - // to get default start time, look at numSamples and calculate a startTime that ends generation at current time - val startMinutesAgo = conf.startMinutesAgo.toOption - .getOrElse((numSamples.toDouble / numTimeSeries / 6).ceil.toInt ) // at 6 samples per minute - - Await.result(produceMetrics(sourceConfig, numSamples, numTimeSeries, startMinutesAgo), 1.hour) - sys.exit(0) - } - import scala.concurrent.ExecutionContext.Implicits.global /** @@ -89,36 +55,36 @@ object TestTimeseriesProducer extends StrictLogging { s"from about ${(System.currentTimeMillis() - startTime) / 1000 / 60} minutes ago") producingFut.map { _ => - // Give enough time for the last containers to be sent off successfully to Kafka - Thread sleep 2000 - logger.info(s"Finished producing $numSamples messages into topic $topicName with timestamps " + - s"from about ${(System.currentTimeMillis() - startTime) / 1000 / 60} minutes ago at $startTime") - val startQuery = startTime / 1000 - val endQuery = startQuery + 300 - val query = - s"""./filo-cli '-Dakka.remote.netty.tcp.hostname=127.0.0.1' --host 127.0.0.1 --dataset prometheus """ + - s"""--promql 'heap_usage{dc="DC0",_ns="App-0"}' --start $startQuery --end $endQuery --limit 15""" - logger.info(s"Periodic Samples CLI Query : \n$query") - - val periodQuery = URLEncoder.encode("""heap_usage{dc="DC0",_ns="App-0"}""", StandardCharsets.UTF_8.toString) - val periodicSamplesUrl = s"http://localhost:8080/promql/prometheus/api/v1/query_range?" + - s"query=$periodQuery&start=$startQuery&end=$endQuery&step=15" - logger.info(s"Periodic Samples query URL: \n$periodicSamplesUrl") - - val rawQuery = URLEncoder.encode("""heap_usage{dc="DC0",_ns="App-0"}[2m]""", - StandardCharsets.UTF_8.toString) - val rawSamplesUrl = s"http://localhost:8080/promql/prometheus/api/v1/query?query=$rawQuery&time=$endQuery" - logger.info(s"Raw Samples query URL: \n$rawSamplesUrl") - - val downsampleSumQuery = URLEncoder.encode("""heap_usage{dc="DC0",_ns="App-0",__col__="sum"}[2m]""", - StandardCharsets.UTF_8.toString) - val downsampledSamplesUrl = s"http://localhost:8080/promql/prometheus_ds_1m/api/v1/query?" + - s"query=$downsampleSumQuery&time=$endQuery" - logger.info(s"Downsampled Samples query URL: \n$downsampledSamplesUrl") - + logQueryHelp(numSamples, numTimeSeries, startTime) } } + + def logQueryHelp(numSamples: Int, numTimeSeries: Int, startTime: Long): Unit = { + val samplesDuration = (numSamples.toDouble / numTimeSeries / 6).ceil.toInt * 60L * 1000L + + logger.info(s"Finished producing $numSamples records for ${samplesDuration / 1000} seconds") + val startQuery = startTime / 1000 + val endQuery = startQuery + 300 + val query = + s"""./filo-cli '-Dakka.remote.netty.tcp.hostname=127.0.0.1' --host 127.0.0.1 --dataset prometheus """ + + s"""--promql 'heap_usage{dc="DC0",_ns="App-0"}' --start $startQuery --end $endQuery --limit 15""" + logger.info(s"Periodic Samples CLI Query : \n$query") + + val q = URLEncoder.encode("""heap_usage{dc="DC0",_ns="App-0"}[2m]""", StandardCharsets.UTF_8.toString) + val periodicSamplesUrl = s"http://localhost:8080/promql/prometheus/api/v1/query_range?" + + s"query=$q&start=$startQuery&end=$endQuery&step=15" + logger.info(s"Periodic Samples query URL: \n$periodicSamplesUrl") + + val q2 = URLEncoder.encode("""heap_usage{dc="DC0",_ns="App-0",__col__="sum"}[2m]""", + StandardCharsets.UTF_8.toString) + val rawSamplesUrl = s"http://localhost:8080/promql/prometheus/api/v1/query?query=$q2&time=$endQuery" + logger.info(s"Raw Samples query URL: \n$rawSamplesUrl") + val downsampledSamplesUrl = s"http://localhost:8080/promql/prometheus_ds_1m/api/v1/query?" + + s"query=$q2&time=$endQuery" + logger.info(s"Downsampled Samples query URL: \n$downsampledSamplesUrl") + } + def metricsToContainerStream(startTime: Long, numShards: Int, numTimeSeries: Int, @@ -129,10 +95,9 @@ object TestTimeseriesProducer extends StrictLogging { val (shardQueues, containerStream) = GatewayServer.shardingPipeline(GlobalConfig.systemConfig, numShards, dataset) val producingFut = Future { - timeSeriesData(startTime, numShards, numTimeSeries) + timeSeriesData(startTime, numTimeSeries) .take(numSamples) - .foreach { sample => - val rec = PrometheusInputRecord(sample.tags, sample.metric, dataset, sample.timestamp, sample.value) + .foreach { rec => val shard = shardMapper.ingestionShard(rec.shardKeyHash, rec.partitionKeyHash, spread) while (!shardQueues(shard).offer(rec)) { Thread sleep 50 } } @@ -141,14 +106,13 @@ object TestTimeseriesProducer extends StrictLogging { } /** - * Generate time series data. + * Generate Prometheus-schema time series data. * * @param startTime Start time stamp - * @param numShards the number of shards or Kafka partitions * @param numTimeSeries number of instances or time series * @return stream of a 2-tuple (kafkaParitionId , sampleData) */ - def timeSeriesData(startTime: Long, numShards: Int, numTimeSeries: Int = 16): Stream[DataSample] = { + def timeSeriesData(startTime: Long, numTimeSeries: Int = 16): Stream[InputRecord] = { // TODO For now, generating a (sinusoidal + gaussian) time series. Other generators more // closer to real world data can be added later. Stream.from(0).map { n => @@ -165,7 +129,57 @@ object TestTimeseriesProducer extends StrictLogging { "partition" -> s"partition-$partition", "host" -> s"H$host", "instance" -> s"Instance-$instance") - DataSample(tags, "heap_usage", timestamp, value) + + PrometheusInputRecord(tags, "heap_usage", dataset, timestamp, value) + } + } + + import ZCUTF8._ + import Column.ColumnType._ + + val dcUTF8 = "dc".utf8 + val nsUTF8 = "_ns".utf8 + val partUTF8 = "partition".utf8 + val hostUTF8 = "host".utf8 + val instUTF8 = "instance".utf8 + + /** + * Generate a stream of random Histogram data, with the metric name "http_request_latency" + * Schema: (timestamp:ts, sum:long, count:long, h:hist) for data, plus (metric:string, tags:map) + * The dataset must match the above schema + */ + def genHistogramData(startTime: Long, dataset: Dataset, numTimeSeries: Int = 16): Stream[InputRecord] = { + require(dataset.dataColumns.map(_.columnType) == Seq(TimestampColumn, LongColumn, LongColumn, HistogramColumn)) + val numBuckets = 10 + + val histBucketScheme = bv.GeometricBuckets(2.0, 3.0, numBuckets) + val buckets = new Array[Long](numBuckets) + def updateBuckets(bucketNo: Int): Unit = { + for { b <- bucketNo until numBuckets } { + buckets(b) += 1 + } + } + + Stream.from(0).map { n => + val instance = n % numTimeSeries + val dc = instance & oneBitMask + val partition = (instance >> 1) & twoBitMask + val app = (instance >> 3) & twoBitMask + val host = (instance >> 4) & twoBitMask + val timestamp = startTime + (n.toLong / numTimeSeries) * 10000 // generate 1 sample every 10s for each instance + + updateBuckets(n % numBuckets) + val hist = bv.LongHistogram(histBucketScheme, buckets.map(x => x)) + val count = util.Random.nextInt(100).toLong + val sum = buckets.sum + + val tags = Map(dcUTF8 -> s"DC$dc".utf8, + nsUTF8 -> s"App-$app".utf8, + partUTF8 -> s"partition-$partition".utf8, + hostUTF8 -> s"H$host".utf8, + instUTF8 -> s"Instance-$instance".utf8) + + new MetricTagInputRecord(Seq(timestamp, sum, count, hist), "http_request_latency", tags, dataset) } } } diff --git a/http/src/test/scala/filodb/http/ClusterApiRouteSpec.scala b/http/src/test/scala/filodb/http/ClusterApiRouteSpec.scala index f0e054b0e7..31d0efc46d 100644 --- a/http/src/test/scala/filodb/http/ClusterApiRouteSpec.scala +++ b/http/src/test/scala/filodb/http/ClusterApiRouteSpec.scala @@ -1,7 +1,5 @@ package filodb.http -import scala.concurrent.duration._ - import akka.actor.ActorSystem import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.model.{StatusCodes, ContentTypes} @@ -83,21 +81,9 @@ class ClusterApiRouteSpec extends FunSpec with ScalatestRouteTest with AsyncTest it("should return shard status after dataset is setup") { setupDataset() - // Repeatedly query cluster status until we know it is OK - var statuses: Seq[ShardStatus] = Nil - do { - probe.send(clusterProxy, NodeClusterActor.GetShardMap(dataset6.ref)) - Thread sleep 500 - statuses = probe.expectMsgPF(3.seconds) { - case CurrentShardSnapshot(_, mapper) => mapper.statuses - } - println(s"Current statuses = $statuses") - info(s"Current statuses = $statuses") - if (statuses.exists(_ == ShardStatusError)) { - info(s"ERROR in status, breaking") - throw new RuntimeException(s"Got error in statuses $statuses") - } - } while (statuses.take(2) != Seq(ShardStatusActive, ShardStatusActive)) + + // Give the coordinator nodes some time to get started + Thread sleep 1000 Get(s"/api/v1/cluster/${dataset6.ref}/status") ~> clusterRoute ~> check { handled shouldBe true @@ -106,7 +92,8 @@ class ClusterApiRouteSpec extends FunSpec with ScalatestRouteTest with AsyncTest val resp = responseAs[HttpList[HttpShardState]] resp.status shouldEqual "success" resp.data should have length 4 - resp.data.map(_.status).filter(_ contains "Active") should have length 2 // Two active nodes + // Exact status of assigned nodes doesn't matter much. This is an HTTP route test, not a sharding test + resp.data.map(_.status).filter(_ contains "Unassigned") should have length 2 // Two unassigned nodes } } diff --git a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala index e416ce8983..f52faf427f 100644 --- a/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/PartKeyIndexBenchmark.scala @@ -24,17 +24,9 @@ class PartKeyIndexBenchmark { val dataset = FormatConversion.dataset val partKeyIndex = new PartKeyLuceneIndex(dataset, 0, TestData.storeConf) val numSeries = 1000000 - val partKeyData = TestTimeseriesProducer.timeSeriesData(0, 1, numSeries) take numSeries + val partKeyData = TestTimeseriesProducer.timeSeriesData(0, numSeries) take numSeries val partKeyBuilder = new RecordBuilder(MemFactory.onHeapFactory, dataset.partKeySchema) - partKeyData.foreach { s => - partKeyBuilder.startNewRecord() - partKeyBuilder.startMap() - s.tags.foreach { case (k, v) => - partKeyBuilder.addMapKeyValue(k.getBytes(), v.getBytes()) - } - partKeyBuilder.endMap() - partKeyBuilder.endRecord() - } + partKeyData.foreach(_.addToBuilder(partKeyBuilder)) var partId = 1 val now = System.currentTimeMillis() diff --git a/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala b/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala index 6f980ee40e..8a0df92511 100644 --- a/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/QueryAndIngestBenchmark.scala @@ -41,12 +41,12 @@ class QueryAndIngestBenchmark extends StrictLogging { import NodeClusterActor._ val numShards = 2 - val numSamples = 720 // 2 hours * 3600 / 10 sec interval - val numSeries = 100 + val numSamples = 1080 // 3 hours * 3600 / 10 sec interval + val numSeries = 800 var startTime = System.currentTimeMillis - (3600*1000) - val numQueries = 500 // Please make sure this number matches the OperationsPerInvocation below - val queryIntervalMin = 55 // # minutes between start and stop - val queryStep = 60 // # of seconds between each query sample "step" + val numQueries = 200 // Please make sure this number matches the OperationsPerInvocation below + val queryIntervalMin = 180 // # minutes between start and stop + val queryStep = 60 // # of seconds between each query sample "step" val spread = 1 // TODO: move setup and ingestion to another trait @@ -100,12 +100,11 @@ class QueryAndIngestBenchmark extends StrictLogging { val shards = (0 until numShards).map { s => memstore.getShardE(dataset.ref, s) } private def ingestSamples(noSamples: Int): Future[Unit] = Future { - TestTimeseriesProducer.timeSeriesData(startTime, numShards, numSeries) + TestTimeseriesProducer.timeSeriesData(startTime, numSeries) .take(noSamples * numSeries) - .foreach { sample => - val rec = PrometheusInputRecord(sample.tags, sample.metric, dataset, sample.timestamp, sample.value) + .foreach { rec => // we shouldn't ingest samples for same timestamps repeatedly. This will also result in out-of-order samples. - startTime = Math.max(startTime, sample.timestamp + 10000) + startTime = Math.max(startTime, rec.asInstanceOf[PrometheusInputRecord].timestamp + 10000) val shard = shardMapper.ingestionShard(rec.shardKeyHash, rec.partitionKeyHash, spread) while (!shardQueues(shard).offer(rec)) { Thread sleep 50 } } @@ -122,14 +121,14 @@ class QueryAndIngestBenchmark extends StrictLogging { * They are designed to match all the time series (common case) under a particular metric and job */ val queries = Seq("heap_usage{_ns=\"App-2\"}", // raw time series - """sum(rate(heap_usage{_ns="App-2"}[5m]))""", + """sum(rate(heap_usage{_ns="App-2"}[1m]))""", """quantile(0.75, heap_usage{_ns="App-2"})""", - """sum_over_time(heap_usage{_ns="App-2"}[5m])""") + """sum_over_time(heap_usage{_ns="App-2"}[1m])""") val queryTime = startTime + (5 * 60 * 1000) // 5 minutes from start until 60 minutes from start val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60) val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) } val queryCommands = logicalPlans.map { plan => - LogicalPlan2Query(dataset.ref, plan, QueryOptions(1, 20000)) + LogicalPlan2Query(dataset.ref, plan, QueryOptions(1, 1000000)) } private var testProducingFut: Option[Future[Unit]] = None @@ -156,7 +155,7 @@ class QueryAndIngestBenchmark extends StrictLogging { @Benchmark @BenchmarkMode(Array(Mode.Throughput)) @OutputTimeUnit(TimeUnit.SECONDS) - @OperationsPerInvocation(500) + @OperationsPerInvocation(200) def parallelQueries(): Unit = { val futures = (0 until numQueries).map { n => val f = asyncAsk(coordinator, queryCommands(n % queryCommands.length)) diff --git a/kafka/src/main/scala/filodb/kafka/TestConsumer.scala b/kafka/src/main/scala/filodb/kafka/TestConsumer.scala index 9daa4a4d49..4e814abf1d 100644 --- a/kafka/src/main/scala/filodb/kafka/TestConsumer.scala +++ b/kafka/src/main/scala/filodb/kafka/TestConsumer.scala @@ -4,33 +4,44 @@ import scala.concurrent.Await import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory +import monix.execution.Scheduler +import filodb.coordinator.{FilodbSettings, IngestionStreamFactory, StoreFactory} import filodb.core.memstore.SomeData -import filodb.core.metadata.Dataset +import filodb.core.store.IngestionConfig /** - * A simple app which uses the KafkaIngestionStream plus a sourceconfig of your choice to test reading - * data from Kafka and test reading from certain offsets. + * A simple app which uses a sourceconfig of your choice to test reading + * data from Kafka (or whatever configured source factory) and test reading from certain offsets. + * It reads dataset definition from MetaStore, so please pass the server.conf with Cassandra/metastore details. * - * To launch: java -Xmx4G -cp /standalone-assembly-0.7.0.jar filodb.kafka.TestConsumer \ - * my-kafka-sourceconfig.conf + * To launch: java -Xmx4G -Dconfig.file=conf/timeseries-filodb-server.conf \ + * -cp /standalone-assembly-0.7.0.jar filodb.kafka.TestConsumer \ + * my-kafka-sourceconfig.conf * It will read 10 records and then quit, printing out the offsets of each record. * Optional: pass in a second arg which is the offset to seek to. */ object TestConsumer extends App { + val settings = new FilodbSettings() + val storeFactory = StoreFactory(settings, Scheduler.io()) + val sourceConfPath = args(0) val offsetOpt = args.drop(1).headOption.map(_.toLong) + val shard = if (args.length > 1) args(1).toInt else 0 val sourceConf = ConfigFactory.parseFile(new java.io.File(sourceConfPath)) //scalastyle:off - println(s"TestConsumer starting with config $sourceConf\nand offset $offsetOpt") + println(s"TestConsumer starting with shard $shard, config $sourceConf\nand offset $offsetOpt") import monix.execution.Scheduler.Implicits.global - // For now, hard code dataset to a Prometheus like dataset - // TODO: allow specification of dataset, then load from the MetaStore - val dataset = Dataset("prometheus", Seq("tags:map"), Seq("timestamp:long", "value:double")) - val stream = new KafkaIngestionStream(sourceConf, dataset, 0, offsetOpt) + val ingestConf = IngestionConfig(sourceConf, classOf[KafkaIngestionStreamFactory].getClass.getName).get + val dataset = Await.result(storeFactory.metaStore.getDataset(ingestConf.ref), 30.seconds) + + val ctor = Class.forName(ingestConf.streamFactoryClass).getConstructors.head + val streamFactory = ctor.newInstance().asInstanceOf[IngestionStreamFactory] + + val stream = streamFactory.create(sourceConf, dataset, shard, offsetOpt) val fut = stream.get.take(10) .foreach { case SomeData(container, offset) => println(s"\n----- Offset $offset -----") diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index c4cd7e0197..ee8ac39214 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -116,16 +116,7 @@ trait Histogram extends Ordered[Histogram] { } object Histogram { - val empty = new Histogram { - final def numBuckets: Int = 0 - final def bucketTop(no: Int): Double = ??? - final def bucketValue(no: Int): Double = ??? - final def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = { - val buf = intoBuf.getOrElse(BinaryHistogram.histBuf) - BinaryHistogram.writeNonIncreasing(HistogramBuckets.emptyBuckets, Array[Long](), buf) - buf - } - } + val empty = MutableHistogram(HistogramBuckets.emptyBuckets, Array.empty) } trait HistogramWithBuckets extends Histogram { @@ -176,7 +167,7 @@ final case class MutableHistogram(buckets: HistogramBuckets, values: Array[Doubl /** * Copies this histogram as a new copy so it can be used for aggregation or mutation. Allocates new storage. */ - final def copy: Histogram = MutableHistogram(buckets, values.clone) + final def copy: MutableHistogram = MutableHistogram(buckets, values.clone) /** * Adds the values from another Histogram. diff --git a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala index cc23454556..933d4e8bbb 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala @@ -279,7 +279,7 @@ class AppendableHistogramVector(factory: MemFactory, trait HistogramReader extends VectorDataReader { def buckets: HistogramBuckets - def apply(index: Int): Histogram + def apply(index: Int): HistogramWithBuckets def sum(start: Int, end: Int): MutableHistogram } @@ -372,7 +372,7 @@ class RowHistogramReader(histVect: Ptr.U8) extends HistogramReader { def length(addr: BinaryVectorPtr): Int = length // WARNING: histogram returned is shared between calls, do not reuse! - final def apply(index: Int): Histogram = { + final def apply(index: Int): HistogramWithBuckets = { require(length > 0) val histPtr = locate(index) val histLen = histPtr.asU16.getU16 diff --git a/project/FiloBuild.scala b/project/FiloBuild.scala index b6a1607cdf..8ff4f496cb 100644 --- a/project/FiloBuild.scala +++ b/project/FiloBuild.scala @@ -152,7 +152,7 @@ object FiloBuild extends Build { .settings(libraryDependencies ++= gatewayDeps) .settings(gatewayAssemblySettings: _*) .dependsOn(coordinator % "compile->compile; test->test", - prometheus) + prometheus, cassandra) // Zookeeper pulls in slf4j-log4j12 which we DON'T want val excludeZK = ExclusionRule(organization = "org.apache.zookeeper") diff --git a/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala b/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala index 332a4d7f4b..e074600949 100644 --- a/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala +++ b/query/src/main/scala/filodb/query/exec/AggrOverRangeVectors.scala @@ -5,6 +5,7 @@ import java.nio.ByteBuffer import scala.collection.mutable import com.tdunning.math.stats.{ArrayDigest, TDigest} +import com.typesafe.scalalogging.StrictLogging import monix.reactive.Observable import filodb.core.binaryrecord2.RecordBuilder @@ -111,7 +112,7 @@ final case class AggregatePresenter(aggrOp: AggregationOperator, * * This singleton is the facade for the above operations. */ -object RangeVectorAggregator { +object RangeVectorAggregator extends StrictLogging { /** * This method is the facade for map and reduce steps of the aggregation. @@ -146,6 +147,7 @@ object RangeVectorAggregator { rowAgg: RowAggregator, skipMapPhase: Boolean, grouping: RangeVector => RangeVectorKey): Map[RangeVectorKey, Iterator[rowAgg.AggHolderType]] = { + logger.trace(s"mapReduceInternal on ${rvs.size} RangeVectors...") var acc = rowAgg.zero val mapInto = rowAgg.newRowToMapInto rvs.groupBy(grouping).mapValues { rvs => @@ -320,7 +322,8 @@ object SumRowAggregator extends RowAggregator { object HistSumRowAggregator extends RowAggregator { import filodb.memory.format.{vectors => bv} - class HistSumHolder(var timestamp: Long = 0L, var h: bv.Histogram = bv.Histogram.empty) extends AggregateHolder { + class HistSumHolder(var timestamp: Long = 0L, + var h: bv.MutableHistogram = bv.Histogram.empty) extends AggregateHolder { val row = new TransientHistRow() def toRowReader: MutableRowReader = { row.setValues(timestamp, h); row } def resetToZero(): Unit = h = bv.Histogram.empty @@ -331,10 +334,12 @@ object HistSumRowAggregator extends RowAggregator { def map(rvk: RangeVectorKey, item: RowReader, mapInto: MutableRowReader): RowReader = item def reduceAggregate(acc: HistSumHolder, aggRes: RowReader): HistSumHolder = { acc.timestamp = aggRes.getLong(0) + val newHist = aggRes.getHistogram(1) acc.h match { // sum is mutable histogram, copy to be sure it's our own copy - case hist if hist.numBuckets == 0 => acc.h = bv.MutableHistogram(aggRes.getHistogram(1)) - case hist: bv.MutableHistogram => hist.add(aggRes.getHistogram(1).asInstanceOf[bv.HistogramWithBuckets]) + case hist if hist.numBuckets == 0 => acc.h = bv.MutableHistogram(newHist) + case h if newHist.numBuckets > 0 => acc.h.add(newHist.asInstanceOf[bv.HistogramWithBuckets]) + case h => } acc } diff --git a/query/src/main/scala/filodb/query/exec/ExecPlan.scala b/query/src/main/scala/filodb/query/exec/ExecPlan.scala index fe4e1be5df..1d0eb59ac3 100644 --- a/query/src/main/scala/filodb/query/exec/ExecPlan.scala +++ b/query/src/main/scala/filodb/query/exec/ExecPlan.scala @@ -98,17 +98,21 @@ trait ExecPlan extends QueryCommand { queryConfig: QueryConfig) (implicit sched: Scheduler, timeout: FiniteDuration): Task[QueryResponse] = { - try { - qLogger.debug(s"queryId: ${id} Started ExecPlan ${getClass.getSimpleName} with $args") + // NOTE: we launch the preparatory steps as a Task too. This is important because scanPartitions, + // Lucene index lookup, and On-Demand Paging orchestration work could suck up nontrivial time and + // we don't want these to happen in a single thread. + Task { + qLogger.debug(s"queryId: ${id} Setting up ExecPlan ${getClass.getSimpleName} with $args") val res = doExecute(source, dataset, queryConfig) val schema = schemaOfDoExecute(dataset) val finalRes = rangeVectorTransformers.foldLeft((res, schema)) { (acc, transf) => - qLogger.debug(s"queryId: ${id} Started Transformer ${transf.getClass.getSimpleName} with ${transf.args}") + qLogger.debug(s"queryId: ${id} Setting up Transformer ${transf.getClass.getSimpleName} with ${transf.args}") (transf.apply(acc._1, queryConfig, limit, acc._2), transf.schema(dataset, acc._2)) } val recSchema = SerializableRangeVector.toSchema(finalRes._2.columns, finalRes._2.brSchemas) val builder = SerializableRangeVector.toBuilder(recSchema) var numResultSamples = 0 // BEWARE - do not modify concurrently!! + qLogger.debug(s"queryId: ${id} Materializing SRVs from iterators if necessary") finalRes._1 .map { case srv: SerializableRangeVector => @@ -148,9 +152,10 @@ trait ExecPlan extends QueryCommand { qLogger.error(s"queryId: ${id} Exception during execution of query: ${printTree(false)}", ex) QueryError(id, ex) } - } catch { case NonFatal(ex) => + }.flatten + .onErrorRecover { case NonFatal(ex) => qLogger.error(s"queryId: ${id} Exception during orchestration of query: ${printTree(false)}", ex) - Task(QueryError(id, ex)) + QueryError(id, ex) } } @@ -257,6 +262,10 @@ abstract class NonLeafExecPlan extends ExecPlan { /** * Sub-class non-leaf nodes should provide their own implementation of how * to compose the sub-query results here. + * + * @param childResponses observable of a pair. First element of pair is the QueryResponse for + * a child ExecPlan, the second element is the index of the child plan. + * There is one response per child plan. */ protected def compose(dataset: Dataset, childResponses: Observable[(QueryResponse, Int)], diff --git a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala index 9a382d1491..de0c9cae83 100644 --- a/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala +++ b/query/src/main/scala/filodb/query/exec/PeriodicSamplesMapper.scala @@ -11,6 +11,7 @@ import filodb.core.store.{ChunkSetInfo, WindowedChunkIterator} import filodb.memory.format.{vectors => bv, RowReader} import filodb.query.{BadQueryException, Query, QueryConfig, RangeFunctionId} import filodb.query.exec.rangefn.{ChunkedRangeFunction, RangeFunction, Window} +import filodb.query.Query.qLogger import filodb.query.util.IndexedArrayQueue /** @@ -62,6 +63,7 @@ final case class PeriodicSamplesMapper(start: Long, } case c: ChunkedRangeFunction[_] => source.map { rv => + qLogger.trace(s"Creating ChunkedWindowIterator for rv=${rv.key}, step=$step windowLength=$windowLength") IteratorBackedRangeVector(rv.key, new ChunkedWindowIteratorD(rv.asInstanceOf[RawDataRangeVector], start, step, end, windowLength, rangeFuncGen().asChunkedD, queryConfig)) @@ -142,7 +144,7 @@ extends Iterator[R] with StrictLogging { case e: Exception => val timestampVector = nextInfo.vectorPtr(rv.timestampColID) val tsReader = bv.LongBinaryVector(timestampVector) - logger.error(s"addChunks Exception: info.numRows=${nextInfo.numRows} " + + qLogger.error(s"addChunks Exception: info.numRows=${nextInfo.numRows} " + s"info.endTime=${nextInfo.endTime} curWindowEnd=${wit.curWindowEnd} " + s"tsReader=$tsReader timestampVectorLength=${tsReader.length(timestampVector)}") throw e diff --git a/query/src/main/scala/filodb/query/exec/TransientRow.scala b/query/src/main/scala/filodb/query/exec/TransientRow.scala index adf3bd7da5..0a8aeb9b7b 100644 --- a/query/src/main/scala/filodb/query/exec/TransientRow.scala +++ b/query/src/main/scala/filodb/query/exec/TransientRow.scala @@ -56,8 +56,8 @@ final class TransientRow(var timestamp: Long, var value: Double) extends Mutable } final class TransientHistRow(var timestamp: Long = 0L, - var value: bv.Histogram = bv.Histogram.empty) extends MutableRowReader { - def setValues(ts: Long, hist: bv.Histogram): Unit = { + var value: bv.HistogramWithBuckets = bv.Histogram.empty) extends MutableRowReader { + def setValues(ts: Long, hist: bv.HistogramWithBuckets): Unit = { timestamp = ts value = hist } diff --git a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala index 27750e67a9..2d71c8f0c2 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala @@ -166,7 +166,7 @@ class SumOverTimeChunkedFunctionL extends SumOverTimeChunkedFunction() with Chun } } -class SumOverTimeChunkedFunctionH(var h: bv.Histogram = bv.Histogram.empty) +class SumOverTimeChunkedFunctionH(var h: bv.MutableHistogram = bv.Histogram.empty) extends ChunkedRangeFunction[TransientHistRow] { override final def reset(): Unit = { h = bv.Histogram.empty } final def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = { diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala index a767c35ea4..2520f5ff1d 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala @@ -315,7 +315,7 @@ extends LastSampleChunkedFunction[TransientRow] { } // LastSample function for Histogram columns -class LastSampleChunkedFunctionH(var value: bv.Histogram = bv.Histogram.empty) +class LastSampleChunkedFunctionH(var value: bv.HistogramWithBuckets = bv.Histogram.empty) extends LastSampleChunkedFunction[TransientHistRow] { override final def reset(): Unit = { timestamp = -1L; value = bv.Histogram.empty } final def apply(endTimestamp: Long, sampleToEmit: TransientHistRow): Unit = { diff --git a/query/src/test/scala/filodb/query/exec/AggrOverRangeVectorsSpec.scala b/query/src/test/scala/filodb/query/exec/AggrOverRangeVectorsSpec.scala index 65c6ee5cae..f926d00118 100644 --- a/query/src/test/scala/filodb/query/exec/AggrOverRangeVectorsSpec.scala +++ b/query/src/test/scala/filodb/query/exec/AggrOverRangeVectorsSpec.scala @@ -344,7 +344,9 @@ class AggrOverRangeVectorsSpec extends RawDataWindowingSpec with ScalaFutures { val samples: Array[RangeVector] = Array(rv1, rv2) val agg1 = RowAggregator(AggregationOperator.Sum, Nil, ColumnType.HistogramColumn) - val resultObs = RangeVectorAggregator.mapReduce(agg1, false, Observable.fromIterable(samples), noGrouping) + val resultObs1 = RangeVectorAggregator.mapReduce(agg1, false, Observable.fromIterable(samples), noGrouping) + val resultObs = RangeVectorAggregator.mapReduce(agg1, true, resultObs1, rv=>rv.key) + val result = resultObs.toListL.runAsync.futureValue result.size shouldEqual 1 result(0).key shouldEqual noKey @@ -356,6 +358,14 @@ class AggrOverRangeVectorsSpec extends RawDataWindowingSpec with ScalaFutures { }.toList result(0).rows.map(_.getHistogram(1)).toList shouldEqual sums + + // Test mapReduce of empty histogram sums + val agg2 = RowAggregator(AggregationOperator.Sum, Nil, ColumnType.HistogramColumn) + val emptyObs = RangeVectorAggregator.mapReduce(agg2, false, Observable.empty, noGrouping) + val resultObs2 = RangeVectorAggregator.mapReduce(agg2, true, emptyObs ++ resultObs1, rv=>rv.key) + val result2 = resultObs2.toListL.runAsync.futureValue + result2.size shouldEqual 1 + result2(0).key shouldEqual noKey } @tailrec