From bb0d56e8e7ff70e5d45b35aaf0e188b5750d80c0 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Mon, 22 Jan 2024 15:40:44 -0800 Subject: [PATCH 1/7] upgrade spark to 3.4.0 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8c370fae55..174b3c1956 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -23,7 +23,7 @@ object Dependencies { val ficusVersion = "1.3.4" val kamonBundleVersion = "2.2.2" val monixKafkaVersion = "1.0.0-RC6" - val sparkVersion = "2.4.8" + val sparkVersion = "3.4.0" val sttpVersion = "1.3.3" /* Dependencies shared */ From a50d90238b771c980ac28fccf757a2174164eddd Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Mon, 29 Jan 2024 13:03:13 -0800 Subject: [PATCH 2/7] enable export to multiple destinations --- core/src/main/resources/filodb-defaults.conf | 2 +- .../downsampler/chunk/BatchExporter.scala | 9 ++ .../downsampler/chunk/DownsamplerMain.scala | 103 +++++++++++++----- .../chunk/DownsamplerSettings.scala | 2 +- .../downsampler/DownsamplerMainSpec.scala | 4 +- 5 files changed, 91 insertions(+), 29 deletions(-) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index 72684f7bcd..1824f504dd 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -575,7 +575,7 @@ filodb { # These labels will be dropped from every exported row. drop-labels = ["_ws_", "drop-label1"] - bucket = "file://" + destination-format = "file:///$0" # Each row's labels are compared against all rule-group keys. If a match is found, # the row's labels are compared *sequentially* against each of the group's rules until diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala index 100bce1b54..1bae366b27 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala @@ -101,6 +101,15 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime }.map(_._2).toSet } + /** + * Returns the index of a column in the export schema. + * E.g. "foo" will return `3` if "foo" is the name of the fourth column (zero-indexed) + * of each exported row. + */ + def getRowIndex(fieldName: String): Option[Int] = { + exportSchema.zipWithIndex.find(_._1.name == fieldName).map(_._2) + } + // Unused, but keeping here for convenience if needed later. private def hashToString(bytes: Array[Byte]): String = { MessageDigest.getInstance("SHA-256") diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala index da87071cdc..ffa072dd65 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala @@ -6,7 +6,8 @@ import java.time.format.DateTimeFormatter import kamon.Kamon import kamon.metric.MeasurementUnit import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SparkSession} import filodb.coordinator.KamonShutdownHook import filodb.core.binaryrecord2.RecordSchema @@ -75,6 +76,58 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { lazy val exportLatency = Kamon.histogram("export-latency", MeasurementUnit.time.milliseconds).withoutTags() + /** + * Returns the configured exportDestinationFormat with all $i (e.g. $0, $1, $2, etc) + * substrings replaced with the ith string of the argument export key. + */ + private def makeExportAddress(exportKey: Seq[String]): String = { + val replaceRegex = "\\$([0-9]+)".r + replaceRegex.replaceAllIn(settings.exportDestinationFormat, matcher => { + // Replace e.g. $0 with the 0th index of the export key. + val index = matcher.group(1).toInt + exportKey(index) + }) + } + + /** + * Exports an RDD for a specific export key. + * (1) Create the export destination address from the key. + * (2) Generate all rows to be exported from the argument RDD. + * (3) Filter for only rows relevant to the export key. + * (4) Write the filtered rows to the destination address. + * NOTE: the export schema is required to define a column for each field of an export key. + * E.g. if a key is defined as [foo, bar], then the result rows must contain a column + * for each of "foo" and "bar". + */ + private def exportForKey(rdd: RDD[Seq[PagedReadablePartition]], + exportKey: Seq[String], + batchExporter: BatchExporter, + sparkSession: SparkSession): Unit = { + val exportStartMs = System.currentTimeMillis() + + val rowKeyIndices = settings.exportRuleKey.map(colName => { + val index = batchExporter.getRowIndex(colName) + assert(index.isDefined, "export-key column name does not exist in pending-export row: " + colName) + index.get + }) + + val saveAddress = makeExportAddress(exportKey) + val filteredRowRdd = rdd.flatMap(batchExporter.getExportRows(_)).filter{ row => + val rowKey = rowKeyIndices.map(row.get(_).toString) + rowKey == exportKey + } + + sparkSession.createDataFrame(filteredRowRdd, batchExporter.exportSchema) + .write + .format(settings.exportFormat) + .mode(settings.exportSaveMode) + .options(settings.exportOptions) + .partitionBy(batchExporter.partitionByNames: _*) + .save(saveAddress) + val exportEndMs = System.currentTimeMillis() + exportLatency.record(exportEndMs - exportStartMs) + } + // Gotcha!! Need separate function (Cannot be within body of a class) // to create a closure for spark to serialize and move to executors. // Otherwise, config values below were not being sent over. @@ -143,40 +196,40 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { cassFetchSize = settings.cassFetchSize) batchIter } - .flatMap { rawPartsBatch => + .map { rawPartsBatch => Kamon.init() KamonShutdownHook.registerShutdownHook() // convert each RawPartData to a ReadablePartition - val readablePartsBatch = rawPartsBatch.map{ rawPart => + rawPartsBatch.map{ rawPart => val rawSchemaId = RecordSchema.schemaID(rawPart.partitionKey, UnsafeUtils.arayOffset) val rawPartSchema = batchDownsampler.schemas(rawSchemaId) new PagedReadablePartition(rawPartSchema, shard = 0, partID = 0, partData = rawPart, minResolutionMs = 1) } - // Downsample the data (this step does not contribute the the RDD). - if (settings.chunkDownsamplerIsEnabled) { - batchDownsampler.downsampleBatch(readablePartsBatch) - } - // Generate the data for the RDD. - if (settings.exportIsEnabled) { - batchExporter.getExportRows(readablePartsBatch) - } else Iterator.empty } - // Export the data produced by "getExportRows" above. - if (settings.exportIsEnabled) { - val exportStartMs = System.currentTimeMillis() - // NOTE: toDF(partitionCols: _*) seems buggy - spark.createDataFrame(rdd, batchExporter.exportSchema) - .write - .format(settings.exportFormat) - .mode(settings.exportSaveMode) - .options(settings.exportOptions) - .partitionBy(batchExporter.partitionByNames: _*) - .save(settings.exportBucket) - val exportEndMs = System.currentTimeMillis() - exportLatency.record(exportEndMs - exportStartMs) + val rddWithDs = if (settings.chunkDownsamplerIsEnabled) { + // Downsample the data. + // NOTE: this step returns each row of the RDD as-is; it does NOT return downsampled data. + rdd.map{ part => + batchDownsampler.downsampleBatch(_) + part + } + } else rdd + + if (settings.exportIsEnabled && settings.exportKeyToRules.nonEmpty) { + val exportKeys = settings.exportKeyToRules.keys.toSeq + + // handle first export key; include the downsample step + exportForKey(rddWithDs, exportKeys.head, batchExporter, spark) + + // handle remaining export keys; no need to downsample for these + for (exportKey <- exportKeys.tail) { + exportForKey(rdd, exportKey, batchExporter, spark) + } + } else { - rdd.foreach(_ => {}) + // Just invoke the DS step. + rddWithDs.foreach(_ => {}) } DownsamplerContext.dsLogger.info(s"Chunk Downsampling Driver completed successfully for downsample period " + diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala index a39741b134..f45b0cc70f 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala @@ -80,7 +80,7 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ @transient lazy val exportRuleKey = downsamplerConfig.as[Seq[String]]("data-export.key-labels") - @transient lazy val exportBucket = downsamplerConfig.as[String]("data-export.bucket") + @transient lazy val exportDestinationFormat = downsamplerConfig.as[String]("data-export.destination-format") @transient lazy val exportDropLabels = downsamplerConfig.as[Seq[String]]("data-export.drop-labels") diff --git a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala index ead8184bfd..fc06a6db0f 100644 --- a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala +++ b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala @@ -52,13 +52,13 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(250, Millis)) // Add a path here to enable export during these tests. Useful for debugging export data. - val exportToFile = None // Some("file:///path/to/dir/") + val exportToFile = None // Some("file:///path/to/dir/$0") val exportConf = s"""{ | "filodb": { "downsampler": { "data-export": { | "enabled": ${exportToFile.isDefined}, | "key-labels": [], - | "bucket": "${exportToFile.getOrElse("")}", + | "destination-format": "${exportToFile.getOrElse("")}", | "format": "csv", | "options": { | "header": true, From 3b9e4088fa44a24c14e5e2556a20d912acbb2c3d Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Mon, 29 Jan 2024 13:47:15 -0800 Subject: [PATCH 3/7] move makeExportAddress to BatchExporter --- .../filodb/downsampler/chunk/BatchExporter.scala | 13 +++++++++++++ .../downsampler/chunk/DownsamplerMain.scala | 15 +-------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala index 1bae366b27..3ea34bad3f 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala @@ -110,6 +110,19 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime exportSchema.zipWithIndex.find(_._1.name == fieldName).map(_._2) } + /** + * Returns the configured exportDestinationFormat with all $i (e.g. $0, $1, $2, etc) + * substrings replaced with the ith string of the argument export key. + */ + def makeExportAddress(exportKey: Seq[String]): String = { + val replaceRegex = "\\$([0-9]+)".r + replaceRegex.replaceAllIn(downsamplerSettings.exportDestinationFormat, matcher => { + // Replace e.g. $0 with the 0th index of the export key. + val index = matcher.group(1).toInt + exportKey(index) + }) + } + // Unused, but keeping here for convenience if needed later. private def hashToString(bytes: Array[Byte]): String = { MessageDigest.getInstance("SHA-256") diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala index ffa072dd65..cbfa7ad5f5 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala @@ -76,19 +76,6 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { lazy val exportLatency = Kamon.histogram("export-latency", MeasurementUnit.time.milliseconds).withoutTags() - /** - * Returns the configured exportDestinationFormat with all $i (e.g. $0, $1, $2, etc) - * substrings replaced with the ith string of the argument export key. - */ - private def makeExportAddress(exportKey: Seq[String]): String = { - val replaceRegex = "\\$([0-9]+)".r - replaceRegex.replaceAllIn(settings.exportDestinationFormat, matcher => { - // Replace e.g. $0 with the 0th index of the export key. - val index = matcher.group(1).toInt - exportKey(index) - }) - } - /** * Exports an RDD for a specific export key. * (1) Create the export destination address from the key. @@ -111,7 +98,7 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { index.get }) - val saveAddress = makeExportAddress(exportKey) + val saveAddress = batchExporter.makeExportAddress(exportKey) val filteredRowRdd = rdd.flatMap(batchExporter.getExportRows(_)).filter{ row => val rowKey = rowKeyIndices.map(row.get(_).toString) rowKey == exportKey From 9c75b079611521343f38bcc3b81a852d47a6d14c Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Mon, 29 Jan 2024 14:14:53 -0800 Subject: [PATCH 4/7] add tests --- .../downsampler/chunk/DownsamplerMain.scala | 2 +- .../downsampler/DownsamplerMainSpec.scala | 42 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala index cbfa7ad5f5..f0c0fdbd1b 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala @@ -7,7 +7,7 @@ import kamon.Kamon import kamon.metric.MeasurementUnit import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{SparkSession} import filodb.coordinator.KamonShutdownHook import filodb.core.binaryrecord2.RecordSchema diff --git a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala index fc06a6db0f..e577353e94 100644 --- a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala +++ b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala @@ -400,6 +400,48 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl } } + it ("should correctly generate export address from labels") { + val testConf = ConfigFactory.parseString( + """ + | filodb.downsampler.data-export { + | enabled = true + | key-labels = ["l1", "l2"] + | destination-format = "address/foo/bar/$0/$1" + | } + |""".stripMargin + ).withFallback(conf) + + val dsSettings = new DownsamplerSettings(testConf.withFallback(conf)) + val batchExporter = new BatchExporter(dsSettings, dummyUserTimeStart, dummyUserTimeStop) + + batchExporter.makeExportAddress(Seq("l1a", "l2a")) shouldEqual "address/foo/bar/l1a/l2a" + } + + it ("should give correct export-row index of column name") { + val testConf = ConfigFactory.parseString( + """ + | filodb.downsampler.data-export { + | enabled = true + | key-labels = ["l1", "l2"] + | path-spec: [ + | year, "<>", + | month, "<>", + | day, "<>", + | _metric_, "{{__name__}}" + | ] + | } + |""".stripMargin + ).withFallback(conf) + + val dsSettings = new DownsamplerSettings(testConf.withFallback(conf)) + val batchExporter = new BatchExporter(dsSettings, dummyUserTimeStart, dummyUserTimeStop) + batchExporter.getRowIndex("year").get shouldEqual 3 + batchExporter.getRowIndex("month").get shouldEqual 4 + batchExporter.getRowIndex("day").get shouldEqual 5 + batchExporter.getRowIndex("_metric_").get shouldEqual 6 + + } + it ("should correctly generate export path names") { val baseConf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf")) From fc894800b6640738cd5f8b334eb0ae94c94a350e Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Mon, 29 Jan 2024 14:15:49 -0800 Subject: [PATCH 5/7] Revert "upgrade spark to 3.4.0" This reverts commit bb0d56e8e7ff70e5d45b35aaf0e188b5750d80c0. Tests do not run correctly after the upgrade. --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 174b3c1956..8c370fae55 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -23,7 +23,7 @@ object Dependencies { val ficusVersion = "1.3.4" val kamonBundleVersion = "2.2.2" val monixKafkaVersion = "1.0.0-RC6" - val sparkVersion = "3.4.0" + val sparkVersion = "2.4.8" val sttpVersion = "1.3.3" /* Dependencies shared */ From 83625490891a8c6179a5da45fa53d1b9c3406820 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Tue, 6 Feb 2024 21:47:12 -0800 Subject: [PATCH 6/7] export RDDs in parallel --- .../downsampler/chunk/DownsamplerMain.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala index f0c0fdbd1b..62ff2d4e5d 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala @@ -205,15 +205,14 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { if (settings.exportIsEnabled && settings.exportKeyToRules.nonEmpty) { val exportKeys = settings.exportKeyToRules.keys.toSeq - - // handle first export key; include the downsample step - exportForKey(rddWithDs, exportKeys.head, batchExporter, spark) - - // handle remaining export keys; no need to downsample for these - for (exportKey <- exportKeys.tail) { - exportForKey(rdd, exportKey, batchExporter, spark) + val exportTasks = { + // downsample the data as the first key is exported + Seq(() => exportForKey(rddWithDs, exportKeys.head, batchExporter, spark)) ++ + // export all remaining keys without the downsample step + exportKeys.tail.map(key => () => exportForKey(rdd, key, batchExporter, spark)) } - + // export/downsample RDDs in parallel + exportTasks.par.foreach(_.apply()) } else { // Just invoke the DS step. rddWithDs.foreach(_ => {}) From 70b5d216801fb2b28b61f8235405d4997d788992 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Tue, 6 Feb 2024 21:48:40 -0800 Subject: [PATCH 7/7] cleanup --- .../scala/filodb/downsampler/chunk/DownsamplerMain.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala index 62ff2d4e5d..c012abb8be 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerMain.scala @@ -207,9 +207,10 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable { val exportKeys = settings.exportKeyToRules.keys.toSeq val exportTasks = { // downsample the data as the first key is exported - Seq(() => exportForKey(rddWithDs, exportKeys.head, batchExporter, spark)) ++ - // export all remaining keys without the downsample step - exportKeys.tail.map(key => () => exportForKey(rdd, key, batchExporter, spark)) + val headTask = Seq(() => exportForKey(rddWithDs, exportKeys.head, batchExporter, spark)) + // export all remaining keys without the downsample step + val tailTasks = exportKeys.tail.map(key => () => exportForKey(rdd, key, batchExporter, spark)) + headTask ++ tailTasks } // export/downsample RDDs in parallel exportTasks.par.foreach(_.apply())