Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

** WIP ** feat(downsample): export to multiple locations #1706

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ filodb {
# These labels will be dropped from every exported row.
drop-labels = ["_ws_", "drop-label1"]

bucket = "file://<path-to-file>"
destination-format = "file://<path-to-file>/$0"

# Each row's labels are compared against all rule-group keys. If a match is found,
# the row's labels are compared *sequentially* against each of the group's rules until
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,28 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}.map(_._2).toSet
}

/**
* Returns the index of a column in the export schema.
* E.g. "foo" will return `3` if "foo" is the name of the fourth column (zero-indexed)
* of each exported row.
*/
def getRowIndex(fieldName: String): Option[Int] = {
exportSchema.zipWithIndex.find(_._1.name == fieldName).map(_._2)
}

/**
* Returns the configured exportDestinationFormat with all $i (e.g. $0, $1, $2, etc)
* substrings replaced with the ith string of the argument export key.
*/
def makeExportAddress(exportKey: Seq[String]): String = {
val replaceRegex = "\\$([0-9]+)".r
replaceRegex.replaceAllIn(downsamplerSettings.exportDestinationFormat, matcher => {
// Replace e.g. $0 with the 0th index of the export key.
val index = matcher.group(1).toInt
exportKey(index)
})
}

// Unused, but keeping here for convenience if needed later.
private def hashToString(bytes: Array[Byte]): String = {
MessageDigest.getInstance("SHA-256")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import java.time.format.DateTimeFormatter
import kamon.Kamon
import kamon.metric.MeasurementUnit
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession}

import filodb.coordinator.KamonShutdownHook
import filodb.core.binaryrecord2.RecordSchema
Expand Down Expand Up @@ -75,6 +76,45 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
lazy val exportLatency =
Kamon.histogram("export-latency", MeasurementUnit.time.milliseconds).withoutTags()

/**
* Exports an RDD for a specific export key.
* (1) Create the export destination address from the key.
* (2) Generate all rows to be exported from the argument RDD.
* (3) Filter for only rows relevant to the export key.
* (4) Write the filtered rows to the destination address.
* NOTE: the export schema is required to define a column for each field of an export key.
* E.g. if a key is defined as [foo, bar], then the result rows must contain a column
* for each of "foo" and "bar".
*/
private def exportForKey(rdd: RDD[Seq[PagedReadablePartition]],
exportKey: Seq[String],
batchExporter: BatchExporter,
sparkSession: SparkSession): Unit = {
val exportStartMs = System.currentTimeMillis()

val rowKeyIndices = settings.exportRuleKey.map(colName => {
val index = batchExporter.getRowIndex(colName)
assert(index.isDefined, "export-key column name does not exist in pending-export row: " + colName)
index.get
})

val saveAddress = batchExporter.makeExportAddress(exportKey)
val filteredRowRdd = rdd.flatMap(batchExporter.getExportRows(_)).filter{ row =>
val rowKey = rowKeyIndices.map(row.get(_).toString)
rowKey == exportKey
}

sparkSession.createDataFrame(filteredRowRdd, batchExporter.exportSchema)
.write
.format(settings.exportFormat)
.mode(settings.exportSaveMode)
.options(settings.exportOptions)
.partitionBy(batchExporter.partitionByNames: _*)
.save(saveAddress)
val exportEndMs = System.currentTimeMillis()
exportLatency.record(exportEndMs - exportStartMs)
}

// Gotcha!! Need separate function (Cannot be within body of a class)
// to create a closure for spark to serialize and move to executors.
// Otherwise, config values below were not being sent over.
Expand Down Expand Up @@ -143,40 +183,40 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
cassFetchSize = settings.cassFetchSize)
batchIter
}
.flatMap { rawPartsBatch =>
.map { rawPartsBatch =>
Kamon.init()
KamonShutdownHook.registerShutdownHook()
// convert each RawPartData to a ReadablePartition
val readablePartsBatch = rawPartsBatch.map{ rawPart =>
rawPartsBatch.map{ rawPart =>
val rawSchemaId = RecordSchema.schemaID(rawPart.partitionKey, UnsafeUtils.arayOffset)
val rawPartSchema = batchDownsampler.schemas(rawSchemaId)
new PagedReadablePartition(rawPartSchema, shard = 0, partID = 0, partData = rawPart, minResolutionMs = 1)
}
// Downsample the data (this step does not contribute the the RDD).
if (settings.chunkDownsamplerIsEnabled) {
batchDownsampler.downsampleBatch(readablePartsBatch)
}
// Generate the data for the RDD.
if (settings.exportIsEnabled) {
batchExporter.getExportRows(readablePartsBatch)
} else Iterator.empty
}

// Export the data produced by "getExportRows" above.
if (settings.exportIsEnabled) {
val exportStartMs = System.currentTimeMillis()
// NOTE: toDF(partitionCols: _*) seems buggy
spark.createDataFrame(rdd, batchExporter.exportSchema)
.write
.format(settings.exportFormat)
.mode(settings.exportSaveMode)
.options(settings.exportOptions)
.partitionBy(batchExporter.partitionByNames: _*)
.save(settings.exportBucket)
val exportEndMs = System.currentTimeMillis()
exportLatency.record(exportEndMs - exportStartMs)
val rddWithDs = if (settings.chunkDownsamplerIsEnabled) {
// Downsample the data.
// NOTE: this step returns each row of the RDD as-is; it does NOT return downsampled data.
rdd.map{ part =>
batchDownsampler.downsampleBatch(_)
part
}
} else rdd

if (settings.exportIsEnabled && settings.exportKeyToRules.nonEmpty) {
val exportKeys = settings.exportKeyToRules.keys.toSeq
val exportTasks = {
// downsample the data as the first key is exported
val headTask = Seq(() => exportForKey(rddWithDs, exportKeys.head, batchExporter, spark))
// export all remaining keys without the downsample step
val tailTasks = exportKeys.tail.map(key => () => exportForKey(rdd, key, batchExporter, spark))
headTask ++ tailTasks
}
// export/downsample RDDs in parallel
exportTasks.par.foreach(_.apply())
} else {
rdd.foreach(_ => {})
// Just invoke the DS step.
rddWithDs.foreach(_ => {})
}

DownsamplerContext.dsLogger.info(s"Chunk Downsampling Driver completed successfully for downsample period " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ

@transient lazy val exportRuleKey = downsamplerConfig.as[Seq[String]]("data-export.key-labels")

@transient lazy val exportBucket = downsamplerConfig.as[String]("data-export.bucket")
@transient lazy val exportDestinationFormat = downsamplerConfig.as[String]("data-export.destination-format")

@transient lazy val exportDropLabels = downsamplerConfig.as[Seq[String]]("data-export.drop-labels")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(250, Millis))

// Add a path here to enable export during these tests. Useful for debugging export data.
val exportToFile = None // Some("file:///path/to/dir/")
val exportToFile = None // Some("file:///path/to/dir/$0")
val exportConf =
s"""{
| "filodb": { "downsampler": { "data-export": {
| "enabled": ${exportToFile.isDefined},
| "key-labels": [],
| "bucket": "${exportToFile.getOrElse("")}",
| "destination-format": "${exportToFile.getOrElse("")}",
| "format": "csv",
| "options": {
| "header": true,
Expand Down Expand Up @@ -400,6 +400,48 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
}
}

it ("should correctly generate export address from labels") {
val testConf = ConfigFactory.parseString(
"""
| filodb.downsampler.data-export {
| enabled = true
| key-labels = ["l1", "l2"]
| destination-format = "address/foo/bar/$0/$1"
| }
|""".stripMargin
).withFallback(conf)

val dsSettings = new DownsamplerSettings(testConf.withFallback(conf))
val batchExporter = new BatchExporter(dsSettings, dummyUserTimeStart, dummyUserTimeStop)

batchExporter.makeExportAddress(Seq("l1a", "l2a")) shouldEqual "address/foo/bar/l1a/l2a"
}

it ("should give correct export-row index of column name") {
val testConf = ConfigFactory.parseString(
"""
| filodb.downsampler.data-export {
| enabled = true
| key-labels = ["l1", "l2"]
| path-spec: [
| year, "<<YYYY>>",
| month, "<<M>>",
| day, "<<d>>",
| _metric_, "{{__name__}}"
| ]
| }
|""".stripMargin
).withFallback(conf)

val dsSettings = new DownsamplerSettings(testConf.withFallback(conf))
val batchExporter = new BatchExporter(dsSettings, dummyUserTimeStart, dummyUserTimeStop)
batchExporter.getRowIndex("year").get shouldEqual 3
batchExporter.getRowIndex("month").get shouldEqual 4
batchExporter.getRowIndex("day").get shouldEqual 5
batchExporter.getRowIndex("_metric_").get shouldEqual 6

}

it ("should correctly generate export path names") {
val baseConf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf"))

Expand Down
Loading