From 432df06a87be6d656a201c0502eaeabd6e3350e2 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Fri, 12 Apr 2019 11:06:38 -0700 Subject: [PATCH 1/4] bug(core): startTimes Lucene query needs batching (#317) There is a limit to number of terms in a query, so we need batching to prevent exception when startTime for too many partIds needs to be fetched. --- .../filodb.core/memstore/PartKeyLuceneIndex.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index 282c62b315..bff65b9505 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -326,11 +326,13 @@ class PartKeyLuceneIndex(dataset: Dataset, */ def startTimeFromPartIds(partIds: Iterator[Int]): debox.Map[Int, Long] = { val collector = new PartIdStartTimeCollector() - val booleanQuery = new BooleanQuery.Builder - partIds.foreach { pId => - booleanQuery.add(new TermQuery(new Term(PART_ID, pId.toString)), Occur.SHOULD) + partIds.grouped(512).map { batch => // limit on query clause count is 1024, hence batch + val booleanQuery = new BooleanQuery.Builder + batch.foreach { pId => + booleanQuery.add(new TermQuery(new Term(PART_ID, pId.toString)), Occur.SHOULD) + } + searcherManager.acquire().search(booleanQuery.build(), collector) } - searcherManager.acquire().search(booleanQuery.build(), collector) collector.startTimes } From f7877b359661037011613065e20abf16b1945e89 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Tue, 16 Apr 2019 10:29:43 -0700 Subject: [PATCH 2/4] bug(core): fix startTimeFromPartIds regression; disable flaky unit tests (#321) --- .../scala/filodb.coordinator/NodeClusterSpec.scala | 8 +++++--- .../scala/filodb.coordinator/FilodbClusterNodeSpec.scala | 3 +++ .../scala/filodb.coordinator/IngestionStreamSpec.scala | 4 +++- .../filodb.coordinator/NodeCoordinatorActorSpec.scala | 4 +++- .../scala/filodb.core/memstore/PartKeyLuceneIndex.scala | 2 +- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/coordinator/src/multi-jvm/scala/filodb.coordinator/NodeClusterSpec.scala b/coordinator/src/multi-jvm/scala/filodb.coordinator/NodeClusterSpec.scala index 315eaf44c4..b7ccf67b47 100644 --- a/coordinator/src/multi-jvm/scala/filodb.coordinator/NodeClusterSpec.scala +++ b/coordinator/src/multi-jvm/scala/filodb.coordinator/NodeClusterSpec.scala @@ -4,6 +4,7 @@ import scala.concurrent.duration._ import akka.actor.ActorRef import akka.remote.testkit.MultiNodeConfig +import org.scalatest.Ignore // import akka.remote.transport.ThrottlerTransportAdapter.Direction.Both import com.typesafe.config.ConfigFactory @@ -245,6 +246,7 @@ abstract class NodeClusterSpec extends ClusterSpec(NodeClusterSpecConfig) { } } -class NodeClusterSpecMultiJvmNode1 extends NodeClusterSpec -class NodeClusterSpecMultiJvmNode2 extends NodeClusterSpec -class NodeClusterSpecMultiJvmNode3 extends NodeClusterSpec \ No newline at end of file +// TODO disabling flaky (on Travis) test until fixed and made reliable +@Ignore class NodeClusterSpecMultiJvmNode1 extends NodeClusterSpec +@Ignore class NodeClusterSpecMultiJvmNode2 extends NodeClusterSpec +@Ignore class NodeClusterSpecMultiJvmNode3 extends NodeClusterSpec \ No newline at end of file diff --git a/coordinator/src/test/scala/filodb.coordinator/FilodbClusterNodeSpec.scala b/coordinator/src/test/scala/filodb.coordinator/FilodbClusterNodeSpec.scala index 3491301513..01b615cfd2 100644 --- a/coordinator/src/test/scala/filodb.coordinator/FilodbClusterNodeSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/FilodbClusterNodeSpec.scala @@ -6,6 +6,7 @@ import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.Ignore import filodb.coordinator.client.MiscCommands import filodb.core.{AbstractSpec, Success} @@ -113,6 +114,8 @@ class ClusterNodeExecutorSpec extends FilodbClusterNodeSpec { } } +// TODO disabled since several tests in this class are flaky in Travis. +@Ignore class ClusterNodeServerSpec extends FilodbClusterNodeSpec { override val role = ClusterRole.Server diff --git a/coordinator/src/test/scala/filodb.coordinator/IngestionStreamSpec.scala b/coordinator/src/test/scala/filodb.coordinator/IngestionStreamSpec.scala index 0748692a64..d4385e6c8f 100644 --- a/coordinator/src/test/scala/filodb.coordinator/IngestionStreamSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/IngestionStreamSpec.scala @@ -4,7 +4,7 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import com.typesafe.scalalogging.StrictLogging -import org.scalatest.BeforeAndAfterEach +import org.scalatest.{BeforeAndAfterEach, Ignore} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} @@ -16,6 +16,8 @@ object IngestionStreamSpec extends ActorSpecConfig // This is really an end to end ingestion test, it's what a client talking to a FiloDB node would do. // Most of the tests use the automated DatasetSetup where the coordinators set up the IngestionStream, but // some set them up manually by invoking the factories directly. +// TODO disabled since this test is flaky in Travis. +@Ignore class IngestionStreamSpec extends ActorTest(IngestionStreamSpec.getNewSystem) with StrictLogging with ScalaFutures with BeforeAndAfterEach { diff --git a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala index 011ea417d4..99be9921c2 100644 --- a/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/NodeCoordinatorActorSpec.scala @@ -8,7 +8,7 @@ import akka.actor.{Actor, ActorRef, AddressFromURIString, PoisonPill, Props} import akka.pattern.gracefulStop import akka.util.Timeout import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfterEach +import org.scalatest.{BeforeAndAfterEach, Ignore} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} @@ -22,6 +22,8 @@ import filodb.prometheus.parse.Parser object NodeCoordinatorActorSpec extends ActorSpecConfig // This is really an end to end ingestion test, it's what a client talking to a FiloDB node would do +// TODO disabled since several tests in this class are flaky in Travis. +@Ignore class NodeCoordinatorActorSpec extends ActorTest(NodeCoordinatorActorSpec.getNewSystem) with ScalaFutures with BeforeAndAfterEach { diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index bff65b9505..d157d97d57 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -326,7 +326,7 @@ class PartKeyLuceneIndex(dataset: Dataset, */ def startTimeFromPartIds(partIds: Iterator[Int]): debox.Map[Int, Long] = { val collector = new PartIdStartTimeCollector() - partIds.grouped(512).map { batch => // limit on query clause count is 1024, hence batch + partIds.grouped(512).foreach { batch => // limit on query clause count is 1024, hence batch val booleanQuery = new BooleanQuery.Builder batch.foreach { pId => booleanQuery.add(new TermQuery(new Term(PART_ID, pId.toString)), Occur.SHOULD) From 64e59d05d42482da8dd73e7479f4e2fa8daeafca Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Thu, 18 Apr 2019 15:11:19 -0700 Subject: [PATCH 3/4] test(core): Unit test for Lucene startTimes api for multiple partIds (#325) * also added span to measure Lucene startTimes API during ODP --- .../memstore/PartKeyLuceneIndex.scala | 5 +++++ .../memstore/PartKeyLuceneIndexSpec.scala | 17 +++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala index d157d97d57..c9a943d9ad 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyLuceneIndex.scala @@ -325,6 +325,10 @@ class PartKeyLuceneIndex(dataset: Dataset, * Called when a document is updated with new endTime */ def startTimeFromPartIds(partIds: Iterator[Int]): debox.Map[Int, Long] = { + val span = Kamon.buildSpan("index-startTimes-for-odp-lookup-latency") + .withTag("dataset", dataset.name) + .withTag("shard", shardNum) + .start() val collector = new PartIdStartTimeCollector() partIds.grouped(512).foreach { batch => // limit on query clause count is 1024, hence batch val booleanQuery = new BooleanQuery.Builder @@ -333,6 +337,7 @@ class PartKeyLuceneIndex(dataset: Dataset, } searcherManager.acquire().search(booleanQuery.build(), collector) } + span.finish() collector.startTimes } diff --git a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala index c0098ec0da..ba32d5e18c 100644 --- a/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/PartKeyLuceneIndexSpec.scala @@ -103,6 +103,23 @@ class PartKeyLuceneIndexSpec extends FunSpec with Matchers with BeforeAndAfter { } + it("should add part keys and fetch startTimes correctly") { + val numPartIds = 3000 // needs to be more than 1024 to test the lucene term limit + val start = System.currentTimeMillis() + // we dont care much about the partKey here, but the startTime against partId. + val partKeys = Stream.continually(readers.head).take(numPartIds).toList + partKeyFromRecords(dataset6, records(dataset6, partKeys), Some(partBuilder)) + .zipWithIndex.foreach { case (addr, i) => + keyIndex.addPartKey(partKeyOnHeap(dataset6, ZeroPointer, addr), i, start + i)() + } + keyIndex.commitBlocking() + + val startTimes = keyIndex.startTimeFromPartIds((0 until numPartIds).iterator) + for { i <- 0 until numPartIds} { + startTimes(i) shouldEqual start + i + } + } + it("should update part keys with endtime and parse filters correctly") { val start = System.currentTimeMillis() // Add the first ten keys and row numbers From 11f553699d3a20ee44efce9f5c792eb8a79b3974 Mon Sep 17 00:00:00 2001 From: Shaik Sher Ali Date: Wed, 24 Apr 2019 10:36:14 -0700 Subject: [PATCH 4/4] update version to 0.8.4.1 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index ca68fcbd5f..474d9bd629 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.8.4" +version in ThisBuild := "0.8.4.1"