From 961cfbcce8c90cdf745eee933668925f3de682f7 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Tue, 7 Jul 2020 23:22:43 -0700 Subject: [PATCH] bug(cassandra): Read from ingestion index table had duplicates (#812) Scan on ingestion index table results in duplicates since there is a row per matching chunk. Remove duplicates from the query result before batching. --- .../columnstore/CassandraColumnStore.scala | 2 -- .../columnstore/IngestionTimeIndexTable.scala | 3 +-- .../columnstore/CassandraColumnStoreSpec.scala | 12 ++++++++---- core/src/main/resources/filodb-defaults.conf | 2 +- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala index 012cca0d99..ec2449967a 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala @@ -204,8 +204,6 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { case split => throw new UnsupportedOperationException(s"Unknown split type $split seen") } - import filodb.core.Iterators._ - val chunksTable = getOrCreateChunkTable(datasetRef) partKeys.sliding(batchSize, batchSize).map { parts => logger.debug(s"Querying cassandra for chunks from ${parts.size} partitions userTimeStart=$userTimeStart " + diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala index 4ae4bdfb03..704640df3e 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/IngestionTimeIndexTable.scala @@ -113,8 +113,7 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef, end.toLong: java.lang.Long, ingestionTimeStart: java.lang.Long, ingestionTimeEnd: java.lang.Long) - session.execute(stmt).iterator.asScala - .map { row => row.getBytes("partition") } + session.execute(stmt).asScala.map { row => row.getBytes("partition") }.toSet.iterator } } diff --git a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala index 99c60fb42e..c1318173bb 100644 --- a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala +++ b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraColumnStoreSpec.scala @@ -247,9 +247,12 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { val firstSampleTime = 74373042000L val partBuilder = new RecordBuilder(nativeMemoryManager) val ingestTime = 1594130687316L - val rows = Seq(TupleRowReader((Some(firstSampleTime), Some(0.0d)))) - val chunksets = for { i <- 0 until 1050 } yield { - val partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags) + val chunksets = for { + i <- 0 until 1050 + partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags) + c <- 0 until 3 + } yield { + val rows = Seq(TupleRowReader((Some(firstSampleTime + c), Some(0.0d)))) ChunkSet(Schemas.gauge.data, partKey, ingestTime, rows, nativeMemoryManager) } colStore.write(promDataset.ref, Observable.fromIterable(chunksets)).futureValue @@ -260,7 +263,7 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { ingestTime - 1, ingestTime + 1, firstSampleTime - 1, - firstSampleTime + 1, + firstSampleTime + 5, 10L, 100 ).toList @@ -268,6 +271,7 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec { batches.size shouldEqual 11 // 100 rows per batch, 1050 rows => 11 batches batches.zipWithIndex.foreach { case (b, i) => b.size shouldEqual (if (i == 10) 50 else 100) + b.foreach(_.chunkSetsTimeOrdered.size shouldEqual 3) } } } diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index c7d1a31a4b..3c313e8d94 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -233,7 +233,7 @@ filodb { # cass-session-provider-fqcn = fqcn # Number of time series to operate on at one time. Reduce if there is much less memory available - cass-write-batch-size = 500 + cass-write-batch-size = 250 # amount of parallelism to introduce in the spark job. This controls number of spark partitions # increase if the number of splits seen in cassandra reads is low and spark jobs are slow.