Skip to content

Commit

Permalink
bug(cassandra): Read from ingestion index table had duplicates (#812)
Browse files Browse the repository at this point in the history
Scan on ingestion index table results in duplicates since there is a
row per matching chunk. Remove duplicates from the query result before batching.
  • Loading branch information
vishramachandran authored Jul 8, 2020
1 parent 2600daa commit 961cfbc
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
case split => throw new UnsupportedOperationException(s"Unknown split type $split seen")
}

import filodb.core.Iterators._

val chunksTable = getOrCreateChunkTable(datasetRef)
partKeys.sliding(batchSize, batchSize).map { parts =>
logger.debug(s"Querying cassandra for chunks from ${parts.size} partitions userTimeStart=$userTimeStart " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,7 @@ sealed class IngestionTimeIndexTable(val dataset: DatasetRef,
end.toLong: java.lang.Long,
ingestionTimeStart: java.lang.Long,
ingestionTimeEnd: java.lang.Long)
session.execute(stmt).iterator.asScala
.map { row => row.getBytes("partition") }
session.execute(stmt).asScala.map { row => row.getBytes("partition") }.toSet.iterator
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,12 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
val firstSampleTime = 74373042000L
val partBuilder = new RecordBuilder(nativeMemoryManager)
val ingestTime = 1594130687316L
val rows = Seq(TupleRowReader((Some(firstSampleTime), Some(0.0d))))
val chunksets = for { i <- 0 until 1050 } yield {
val partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags)
val chunksets = for {
i <- 0 until 1050
partKey = partBuilder.partKeyFromObjects(Schemas.gauge, gaugeName + i, seriesTags)
c <- 0 until 3
} yield {
val rows = Seq(TupleRowReader((Some(firstSampleTime + c), Some(0.0d))))
ChunkSet(Schemas.gauge.data, partKey, ingestTime, rows, nativeMemoryManager)
}
colStore.write(promDataset.ref, Observable.fromIterable(chunksets)).futureValue
Expand All @@ -260,14 +263,15 @@ class CassandraColumnStoreSpec extends ColumnStoreSpec {
ingestTime - 1,
ingestTime + 1,
firstSampleTime - 1,
firstSampleTime + 1,
firstSampleTime + 5,
10L,
100
).toList

batches.size shouldEqual 11 // 100 rows per batch, 1050 rows => 11 batches
batches.zipWithIndex.foreach { case (b, i) =>
b.size shouldEqual (if (i == 10) 50 else 100)
b.foreach(_.chunkSetsTimeOrdered.size shouldEqual 3)
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ filodb {
# cass-session-provider-fqcn = fqcn

# Number of time series to operate on at one time. Reduce if there is much less memory available
cass-write-batch-size = 500
cass-write-batch-size = 250

# amount of parallelism to introduce in the spark job. This controls number of spark partitions
# increase if the number of splits seen in cassandra reads is low and spark jobs are slow.
Expand Down

0 comments on commit 961cfbc

Please sign in to comment.