diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala index 9d7cb89930..25b1bd408a 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala @@ -79,15 +79,17 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { lazy val exportLatency = Kamon.histogram("export-latency", MeasurementUnit.time.milliseconds).withoutTags() + lazy val numRowsExported = Kamon.counter("num-rows-exported").withoutTags() + /** * Exports an RDD for a specific export key. - * (1) Create the export destination address from the key. - * (2) Generate all rows to be exported from the argument RDD. - * (3) Filter for only rows relevant to the export key. - * (4) Write the filtered rows to the destination address. + * (1) Create the export destination address from the key. + * (2) Generate all rows to be exported from the argument RDD. + * (3) Filter for only rows relevant to the export key. + * (4) Write the filtered rows to the destination address. * NOTE: the export schema is required to define a column for each field of an export key. - * E.g. if a key is defined as [foo, bar], then the result rows must contain a column - * for each of "foo" and "bar". + * E.g. if a key is defined as [foo, bar], then the result rows must contain a column + * for each of "foo" and "bar". */ private def exportForKey(rdd: RDD[Seq[PagedReadablePartition]], exportKey: Seq[String], @@ -101,9 +103,12 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { index.get }) - val filteredRowRdd = rdd.flatMap(batchExporter.getExportRows(_, exportTableConfig)).filter{ row => + val filteredRowRdd = rdd.flatMap(batchExporter.getExportRows(_, exportTableConfig)).filter { row => val rowKey = columnKeyIndices.map(row.get(_).toString) rowKey == exportKey + }.map { row => + numRowsExported.increment() + row } // write filteredRowRdd to iceberg table @@ -121,10 +126,10 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { def run(sparkConf: SparkConf): SparkSession = { val spark = Class.forName(settings.sparkSessionFactoryClass) - .getDeclaredConstructor() - .newInstance() - .asInstanceOf[SparkSessionFactory] - .make(sparkConf) + .getDeclaredConstructor() + .newInstance() + .asInstanceOf[SparkSessionFactory] + .make(sparkConf) DownsamplerContext.dsLogger.info(s"Spark Job Properties: ${spark.sparkContext.getConf.toDebugString}") @@ -134,11 +139,11 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { // Specified during reruns for downsampling old data val userTimeInPeriod: Long = spark.sparkContext.getConf .getOption("spark.filodb.downsampler.userTimeOverride") match { - // by default assume a time in the previous downsample period - case None => System.currentTimeMillis() - settings.downsampleChunkDuration - // examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00 - case Some(str) => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(str)).toEpochMilli() - } + // by default assume a time in the previous downsample period + case None => System.currentTimeMillis() - settings.downsampleChunkDuration + // examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00 + case Some(str) => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(str)).toEpochMilli() + } val userTimeStart: Long = (userTimeInPeriod / settings.downsampleChunkDuration) * settings.downsampleChunkDuration val userTimeEndExclusive: Long = userTimeStart + settings.downsampleChunkDuration @@ -185,7 +190,7 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { Kamon.init() KamonShutdownHook.registerShutdownHook() // convert each RawPartData to a ReadablePartition - rawPartsBatch.map{ rawPart => + rawPartsBatch.map { rawPart => val rawSchemaId = RecordSchema.schemaID(rawPart.partitionKey, UnsafeUtils.arayOffset) val rawPartSchema = batchDownsampler.schemas(rawSchemaId) new PagedReadablePartition(rawPartSchema, shard = 0, partID = 0, partData = rawPart, minResolutionMs = 1) @@ -195,7 +200,7 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { val rddWithDs = if (settings.chunkDownsamplerIsEnabled) { // Downsample the data. // NOTE: this step returns each row of the RDD as-is; it does NOT return downsampled data. - rdd.map{ part => + rdd.map { part => batchDownsampler.downsampleBatch(part) part } @@ -211,8 +216,10 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { val firstExportTaskWithDs = Seq(() => exportForKey(rddWithDs, exportKeyToRules.head._1, exportKeyToRules.head._2, batchExporter, spark)) // export all remaining keys without the downsample step - val remainingExportTasksWithoutDs = exportKeyToRules.tail.map{spec => () => - exportForKey(rdd, spec._1, spec._2, batchExporter, spark)} + val remainingExportTasksWithoutDs = exportKeyToRules.tail.map { spec => + () => + exportForKey(rdd, spec._1, spec._2, batchExporter, spark) + } // create a parallel sequence of tasks (firstExportTaskWithDs ++ remainingExportTasksWithoutDs).par }