Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(downsample): support export to multiple paths in Iceberg format #1720

Merged
merged 7 commits into from
Feb 26, 2024

Conversation

alextheimer
Copy link
Contributor

@alextheimer alextheimer commented Feb 23, 2024

Pull Request checklist

  • The commit(s) message(s) follows the contribution guidelines ?
  • Tests for the changes have been added (for bug fixes / features) ?
  • Docs have been added / updated (for bug fixes / features) ?

Adds support for export from the downsampler job to multiple destinations; data is now exported in the Iceberg format.
This PR will remove all support for export of CSV-formatted data.

From #1718:

Current behavior :
Spark Downsampler job currently exports data in CSV format.

New behavior :
Implements changes in Spark Downsampler Job to export data in Iceberg format and write to Iceberg Table in a specific schema.

Changes includes the following:

  • Updates in BatchExporter.scala to be able to be to create RDD in new schema and convert RDD to DF to be able to write to Iceberg Table.
  • filodb-defaults.conf changes for export includes the following:
  1. Adds catalog, database conf Iceberg properties in filodb-defaults.conf.
  2. Under groups, each group should also include, additional dynamic time series label based columns in addition to fixed columns in Iceberg Table.
  3. Each group, should also include table name, table location.
  4. Each group, can also specify, if partitioning needs to be done by some additional label based columns,
  • To implement above, additional changes are implemented in DownsamplerSettings.scala, BatchExporter.scala, DownsamplerMain.scala.
  • Implements a new ExportTableConfig Class to translate all data like table path, table names, export rules, per group from config.
  • ExportTableConfig is also used to generate ExportSchema dynamically.
  • Adds create table and create database Iceberg methods.
  • In Iceberg, data partitioning will be directly handled by the create table sql statement.
  • Rename destination-format key in filodb-defaults.conf with destination-path.

Additional changes to remove old CSV based export:

  • Remove old CSV based partition-by-template handling to generate data partition keys.
  • Remove CSV specific properties like options, path-spec from filodb-defaults.conf. As mentioned above, for Iceberg, path spec for partitioning is not required. It will be handled by Iceberg create table statement.
  • Remove old CSV based make-label-string methods/properties. In Iceberg table, labels will be stored in column labels of type map<string, string>, hence, string labels are not required.

private def exportDataToRow(exportData: ExportRowData): Row = {
val dataSeq = new mutable.ArrayBuffer[Any](3 + downsamplerSettings.exportPathSpecPairs.size)
private def exportDataToRow(exportData: ExportRowData, exportTableConfig: ExportTableConfig): Row = {
val dataSeq = new mutable.ArrayBuffer[Any](exportTableConfig.tableSchema.fields.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick question, is the size we specify here is for the array size ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the array size for dataSeq. This array size is dependent on the number of standard + dynamic columns in tableSchema.

val dynamicColNames = exportTableConfig.labelColumnMapping.map(pair => pair._2 + " string").mkString(", ")
val partitionColNames = exportTableConfig.partitionByCols.mkString(", ")
s"""
|CREATE TABLE IF NOT EXISTS $catalog.$database.${exportTableConfig.tableName} (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: ( not blocking ) should we move this string constant up and only do string format inside this function ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really a string constant. This string is dynamically created as per the input params of this function.

for ((value, expected) <- inputOutputPairs) {
val map = Map("key" -> value)
BatchExporter.makeLabelString(map) shouldEqual expected
it("should give correct export schema") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lovely change . so much simpler

val tableSchema = {
// NOTE: ArrayBuffers are sometimes used instead of Seq's because otherwise
// ArrayIndexOutOfBoundsExceptions occur when Spark exports a batch.
val fields = new mutable.ArrayBuffer[StructField](labelColumnMapping.length + 9)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment on my we are adding +9. Might be better if we add this as a constant and add a description there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in the latest commit.

val tableName = group.as[String]("table")
val tablePath = group.as[String]("table-path")
val labelColumnMapping = group.as[Seq[String]]("label-column-mapping")
.sliding(2, 2).map(seq => (seq.head, seq.last)).toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for. my understanding, what does sliding (2, 2 ) does?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for creating pairs (2 size) from the config defined under key label-column-mapping. Let me add an example.

Copy link
Contributor

@nikitag55 nikitag55 Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments in the latest commit to explain this sliding and to creation of labelColumnMapping Seq[(a,b), (c,d)]

headTask ++ tailTasks
}
// export/downsample RDDs in parallel
exportTasks.par.foreach(_.apply())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a parallel config to control the degree of parallelism ? what is the default behavior ?

@@ -75,6 +76,40 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
lazy val exportLatency =
Kamon.histogram("export-latency", MeasurementUnit.time.milliseconds).withoutTags()

/**
* Exports an RDD for a specific export key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kudos on the func doc 👍

nikitag55
nikitag55 previously approved these changes Feb 23, 2024
@alextheimer alextheimer merged commit 745eb47 into develop Feb 26, 2024
1 check passed
alextheimer added a commit to alextheimer/FiloDB that referenced this pull request Mar 13, 2024
…ilodb#1720)

Adds support for export from the downsampler job to multiple destinations in the Iceberg format.
Support for export of CSV-formatted data is removed.

---------

Co-authored-by: nikitag55 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants