Skip to content

Commit

Permalink
misc(iceberg-export): Include a metric to count number of rows export…
Browse files Browse the repository at this point in the history
…ed (#1726)
  • Loading branch information
nikitag55 authored Feb 29, 2024
1 parent 6cd6f96 commit b8df78c
Showing 1 changed file with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
lazy val exportLatency =
Kamon.histogram("export-latency", MeasurementUnit.time.milliseconds).withoutTags()

lazy val numRowsExported = Kamon.counter("num-rows-exported").withoutTags()

/**
* Exports an RDD for a specific export key.
* (1) Create the export destination address from the key.
* (2) Generate all rows to be exported from the argument RDD.
* (3) Filter for only rows relevant to the export key.
* (4) Write the filtered rows to the destination address.
* (1) Create the export destination address from the key.
* (2) Generate all rows to be exported from the argument RDD.
* (3) Filter for only rows relevant to the export key.
* (4) Write the filtered rows to the destination address.
* NOTE: the export schema is required to define a column for each field of an export key.
* E.g. if a key is defined as [foo, bar], then the result rows must contain a column
* for each of "foo" and "bar".
* E.g. if a key is defined as [foo, bar], then the result rows must contain a column
* for each of "foo" and "bar".
*/
private def exportForKey(rdd: RDD[Seq[PagedReadablePartition]],
exportKey: Seq[String],
Expand All @@ -101,9 +103,12 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
index.get
})

val filteredRowRdd = rdd.flatMap(batchExporter.getExportRows(_, exportTableConfig)).filter{ row =>
val filteredRowRdd = rdd.flatMap(batchExporter.getExportRows(_, exportTableConfig)).filter { row =>
val rowKey = columnKeyIndices.map(row.get(_).toString)
rowKey == exportKey
}.map { row =>
numRowsExported.increment()
row
}

// write filteredRowRdd to iceberg table
Expand All @@ -121,10 +126,10 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
def run(sparkConf: SparkConf): SparkSession = {

val spark = Class.forName(settings.sparkSessionFactoryClass)
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[SparkSessionFactory]
.make(sparkConf)
.getDeclaredConstructor()
.newInstance()
.asInstanceOf[SparkSessionFactory]
.make(sparkConf)

DownsamplerContext.dsLogger.info(s"Spark Job Properties: ${spark.sparkContext.getConf.toDebugString}")

Expand All @@ -134,11 +139,11 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
// Specified during reruns for downsampling old data
val userTimeInPeriod: Long = spark.sparkContext.getConf
.getOption("spark.filodb.downsampler.userTimeOverride") match {
// by default assume a time in the previous downsample period
case None => System.currentTimeMillis() - settings.downsampleChunkDuration
// examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00
case Some(str) => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(str)).toEpochMilli()
}
// by default assume a time in the previous downsample period
case None => System.currentTimeMillis() - settings.downsampleChunkDuration
// examples: 2019-10-20T12:34:56Z or 2019-10-20T12:34:56-08:00
case Some(str) => Instant.from(DateTimeFormatter.ISO_OFFSET_DATE_TIME.parse(str)).toEpochMilli()
}

val userTimeStart: Long = (userTimeInPeriod / settings.downsampleChunkDuration) * settings.downsampleChunkDuration
val userTimeEndExclusive: Long = userTimeStart + settings.downsampleChunkDuration
Expand Down Expand Up @@ -185,7 +190,7 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
Kamon.init()
KamonShutdownHook.registerShutdownHook()
// convert each RawPartData to a ReadablePartition
rawPartsBatch.map{ rawPart =>
rawPartsBatch.map { rawPart =>
val rawSchemaId = RecordSchema.schemaID(rawPart.partitionKey, UnsafeUtils.arayOffset)
val rawPartSchema = batchDownsampler.schemas(rawSchemaId)
new PagedReadablePartition(rawPartSchema, shard = 0, partID = 0, partData = rawPart, minResolutionMs = 1)
Expand All @@ -195,7 +200,7 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
val rddWithDs = if (settings.chunkDownsamplerIsEnabled) {
// Downsample the data.
// NOTE: this step returns each row of the RDD as-is; it does NOT return downsampled data.
rdd.map{ part =>
rdd.map { part =>
batchDownsampler.downsampleBatch(part)
part
}
Expand All @@ -211,8 +216,10 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
val firstExportTaskWithDs = Seq(() =>
exportForKey(rddWithDs, exportKeyToRules.head._1, exportKeyToRules.head._2, batchExporter, spark))
// export all remaining keys without the downsample step
val remainingExportTasksWithoutDs = exportKeyToRules.tail.map{spec => () =>
exportForKey(rdd, spec._1, spec._2, batchExporter, spark)}
val remainingExportTasksWithoutDs = exportKeyToRules.tail.map { spec =>
() =>
exportForKey(rdd, spec._1, spec._2, batchExporter, spark)
}
// create a parallel sequence of tasks
(firstExportTaskWithDs ++ remainingExportTasksWithoutDs).par
}
Expand Down

0 comments on commit b8df78c

Please sign in to comment.