Skip to content

Commit

Permalink
misc(spark-job-iceberg-data-export)Assign correct NULL/NOT NULL type …
Browse files Browse the repository at this point in the history
…to columns in Iceberg Data Export S… (#1721)

* Assign correct NULL/NOT NULL type to columns in Iceberg Data Export Schema

* remove false/true value from labelColumn Mapping and determine it dynamically

* minor refactor
  • Loading branch information
nikitag55 authored Feb 27, 2024
1 parent 745eb47 commit d950f32
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 68 deletions.
7 changes: 4 additions & 3 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,11 @@ filodb {
table-path = "s3a://<bucket>/<directory>/<catalog>/<database>/ws-foo"
# to add additional dynamic label-based columns to the table
# Eg: _ws_ is the label key in time series to populate column workspace
# Similary, _ns_ is the label key in time series to populate column namespace
# 3rd param NOT NULL/NULL, specifies column can be NOT NULL or NULL
# Similarly, _ns_ is the label key in time series to populate column namespace
label-column-mapping = [
"_ws_", "workspace",
"_ns_", "namespace"
"_ws_", "workspace", "NOT NULL",
"_ns_", "namespace", "NOT NULL"
]
# Partition Iceberg Table by any of the col from label-column-mapping
partition-by-columns = ["namespace"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ case class ExportTableConfig(tableName: String,
tableSchema: StructType,
tablePath: String,
exportRules: Seq[ExportRule],
labelColumnMapping: Seq[(String, String)],
labelColumnMapping: Seq[(String, String, String)],
partitionByCols: Seq[String])

/**
* All info needed to output a result Spark Row.
*/
Expand Down Expand Up @@ -59,7 +60,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
/**
* Returns the index of a column in the export schema.
* E.g. "foo" will return `3` if "foo" is the name of the fourth column (zero-indexed)
* of each exported row.
* of each exported row.
*/
def getColumnIndex(colName: String, exportTableConfig: ExportTableConfig): Option[Int] = {
// check if fieldName is from dynamic schema
Expand All @@ -79,7 +80,8 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime

/**
* Returns an iterator of a single column of the argument partition's chunk.
* @param iCol the index of the column to iterate.
*
* @param iCol the index of the column to iterate.
* @param iRowStart the index of the row to begin iteration from.
*/
private def getChunkColIter(partition: ReadablePartition,
Expand All @@ -96,7 +98,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
*/
private def matchAllFilters(filters: Seq[ColumnFilter],
labels: collection.Map[String, String]): Boolean = {
filters.forall( filt =>
filters.forall(filt =>
labels.get(filt.column)
.exists(value => filt.filter.filterFunc(value))
)
Expand Down Expand Up @@ -141,7 +143,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
private def exportDataToRow(exportData: ExportRowData, exportTableConfig: ExportTableConfig): Row = {
val dataSeq = new mutable.ArrayBuffer[Any](exportTableConfig.tableSchema.fields.length)
// append all dynamic column values
exportTableConfig.labelColumnMapping.foreach {pair => dataSeq.append(exportData.labels.get(pair._1))}
exportTableConfig.labelColumnMapping.foreach { pair => dataSeq.append(exportData.labels.get(pair._1)) }
// append all fixed column values
dataSeq.append(
exportData.metric,
Expand Down Expand Up @@ -177,20 +179,21 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime

private def sqlCreateTable(catalog: String, database: String, exportTableConfig: ExportTableConfig) = {
// create dynamic col names with type to append to create table statement
val dynamicColNames = exportTableConfig.labelColumnMapping.map(pair => pair._2 + " string").mkString(", ")
val dynamicColNames = exportTableConfig.labelColumnMapping.map(pair =>
pair._2 + " string " + pair._3).mkString(", ")
val partitionColNames = exportTableConfig.partitionByCols.mkString(", ")
s"""
|CREATE TABLE IF NOT EXISTS $catalog.$database.${exportTableConfig.tableName} (
| $dynamicColNames,
| metric string,
| metric string NOT NULL,
| labels map<string, string>,
| epoch_timestamp long,
| timestamp timestamp,
| epoch_timestamp long NOT NULL,
| timestamp timestamp NOT NULL,
| value double,
| year int,
| month int,
| day int,
| hour int
| year int NOT NULL,
| month int NOT NULL,
| day int NOT NULL,
| hour int NOT NULL
| )
| USING iceberg
| PARTITIONED BY (year, month, day, $partitionColNames, metric)
Expand All @@ -205,7 +208,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
partKeyMap.get(label).contains(key(i))
}
}.flatMap { case (key, exportTableConfig) =>
exportTableConfig.exportRules.takeWhile{ rule =>
exportTableConfig.exportRules.takeWhile { rule =>
// step through rules while we still haven't matched a "block" filter
!rule.blockFilterGroups.exists { filterGroup =>
matchAllFilters(filterGroup, partKeyMap)
Expand All @@ -220,6 +223,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}

// scalastyle:off method.length

/**
* Returns data about a single row to export.
* exportDataToRow will convert these into Spark Rows that conform to this.exportSchema.
Expand All @@ -237,10 +241,10 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
downsamplerSettings.exportDropLabels.foreach(drop => partKeyMap.remove(drop))
rule.dropLabels.foreach(drop => partKeyMap.remove(drop))

val timestampCol = 0 // FIXME: need a more dynamic (but efficient) solution
val timestampCol = 0 // FIXME: need a more dynamic (but efficient) solution
val columns = partition.schema.data.columns
val valueCol = columns.indexWhere(_.name == partition.schema.data.valueColName)
val rangeInfoIter = getChunkRangeIter(partition, userStartTime, userEndTime).map{ chunkRange =>
val rangeInfoIter = getChunkRangeIter(partition, userStartTime, userEndTime).map { chunkRange =>
val infoReader = chunkRange.chunkSetInfoReader
val irowStart = chunkRange.istartRow
val irowEnd = chunkRange.iendRow
Expand All @@ -253,9 +257,9 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}
val tupleIter = columns(valueCol).columnType match {
case DoubleColumn =>
rangeInfoIter.flatMap{ info =>
rangeInfoIter.flatMap { info =>
val doubleIter = info.valueIter.asDoubleIt
(info.irowStart to info.irowEnd).iterator.map{ _ =>
(info.irowStart to info.irowEnd).iterator.map { _ =>
(partKeyMap, info.timestampIter.next, doubleIter.next)
}
}
Expand All @@ -276,12 +280,12 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
partKeyMap.remove(LABEL_NAME)
partKeyMap.put(LABEL_NAME, nameWithoutBucket)

rangeInfoIter.flatMap{ info =>
rangeInfoIter.flatMap { info =>
val histIter = info.valueIter.asHistIt
(info.irowStart to info.irowEnd).iterator.flatMap{ _ =>
(info.irowStart to info.irowEnd).iterator.flatMap { _ =>
val hist = histIter.next()
val timestamp = info.timestampIter.next
(0 until hist.numBuckets).iterator.map{ i =>
(0 until hist.numBuckets).iterator.map { i =>
val bucketTopString = {
val raw = hist.bucketTop(i)
if (raw.isPosInfinity) "+Inf" else raw.toString
Expand All @@ -303,7 +307,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
numPartitionsExportPrepped.increment()
}

tupleIter.map{ case (labels, epoch_timestamp, value) =>
tupleIter.map { case (labels, epoch_timestamp, value) =>
val metric = labels(LABEL_NAME)
// to compute YYYY, MM, dd, hh
// to compute readable timestamp from unix timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ

@transient lazy val downsamplerConfig = {
val conf = filodbConfig.getConfig("downsampler")
DownsamplerContext.dsLogger.info(s"Loaded following downsampler config: ${conf.root().render()}" )
DownsamplerContext.dsLogger.info(s"Loaded following downsampler config: ${conf.root().render()}")
conf
}

Expand Down Expand Up @@ -95,8 +95,9 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ
// below creates Seq[(_ws_,workspace), (_ns_,namespace)] ["_ws_", "workspace", "_ns_", "namespace"]
// above means _ws_ label key in time series will be used to populate column workspace
// Similarly, _ns_ label key in time series will be used to populate column namespace
// # 3rd param NOT NULL, specifies column can be NULL or NOT NULL
val labelColumnMapping = group.as[Seq[String]]("label-column-mapping")
.sliding(2, 2).map(seq => (seq.head, seq.last)).toSeq
.sliding(3, 3).map(seq => (seq.apply(0), seq.apply(1), seq.apply(2))).toSeq
// Constructs dynamic exportSchema as per ExportTableConfig.
// final schema is a combination of columns defined in conf file plus some
// additional standardized columns
Expand All @@ -107,28 +108,33 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ
// + no. of dynamic cols in labelColumnMapping.length
val fields = new mutable.ArrayBuffer[StructField](labelColumnMapping.length + 9)
// append all dynamic columns as StringType from conf
labelColumnMapping.foreach { pair => fields.append(StructField(pair._2, StringType)) }
labelColumnMapping.foreach { pair =>
if (pair._3 == "NOT NULL")
fields.append(StructField(pair._2, StringType, false))
else
fields.append(StructField(pair._2, StringType, true))
}
// append all fixed columns
fields.append(
StructField(COL_METRIC, StringType),
StructField(COL_METRIC, StringType, false),
StructField(COL_LABELS, StringType),
StructField(COL_EPOCH_TIMESTAMP, LongType),
StructField(COL_TIMESTAMP, TimestampType),
StructField(COL_EPOCH_TIMESTAMP, LongType, false),
StructField(COL_TIMESTAMP, TimestampType, false),
StructField(COL_VALUE, DoubleType),
StructField(COL_YEAR, IntegerType),
StructField(COL_MONTH, IntegerType),
StructField(COL_DAY, IntegerType),
StructField(COL_HOUR, IntegerType)
StructField(COL_YEAR, IntegerType, false),
StructField(COL_MONTH, IntegerType, false),
StructField(COL_DAY, IntegerType, false),
StructField(COL_HOUR, IntegerType, false)
)
StructType(fields)
}
val partitionByCols = group.as[Seq[String]]("partition-by-columns")
val rules = group.as[Seq[Config]]("rules").map { rule =>
val allowFilterGroups = rule.as[Seq[Seq[String]]]("allow-filters").map{ group =>
val allowFilterGroups = rule.as[Seq[Seq[String]]]("allow-filters").map { group =>
Parser.parseQuery(s"{${group.mkString(",")}}")
.asInstanceOf[InstantExpression].getUnvalidatedColumnFilters()
}
val blockFilterGroups = rule.as[Seq[Seq[String]]]("block-filters").map{ group =>
val blockFilterGroups = rule.as[Seq[Seq[String]]]("block-filters").map { group =>
Parser.parseQuery(s"{${group.mkString(",")}}")
.asInstanceOf[InstantExpression].getUnvalidatedColumnFilters()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| "table": "",
| "table-path": "${exportToFile.getOrElse("")}",
| "label-column-mapping": [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| "partition-by-columns": ["namespace"]
| "rules": [
Expand Down Expand Up @@ -191,8 +191,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1a"
| table-path = "s3a://bucket/directory/catalog/database/l1a",
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand All @@ -218,8 +218,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1a"
| table-path = "s3a://bucket/directory/catalog/database/l1a"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand Down Expand Up @@ -250,8 +250,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1a"
| table-path = "s3a://bucket/directory/catalog/database/l1a"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand Down Expand Up @@ -286,8 +286,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1a"
| table-path = "s3a://bucket/directory/catalog/database/l1a"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand Down Expand Up @@ -318,8 +318,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1a"
| table-path = "s3a://bucket/directory/catalog/database/l1a"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand All @@ -345,8 +345,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1a"
| table-path = "s3a://bucket/directory/catalog/database/l1a"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand All @@ -362,8 +362,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1b"
| table-path: "s3a://bucket/directory/catalog/database/l1b"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand All @@ -389,8 +389,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "l1a"
| table-path = "s3a://bucket/directory/catalog/database/l1a"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand Down Expand Up @@ -470,8 +470,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "my_ws"
| table-path = "s3a://bucket/directory/catalog/database/my_ws"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand Down Expand Up @@ -505,8 +505,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
| table = "my_ws"
| table-path = "s3a://bucket/directory/catalog/database/my_ws"
| label-column-mapping = [
| "_ws_", "workspace",
| "_ns_", "namespace"
| "_ws_", "workspace", "NOT NULL",
| "_ns_", "namespace", "NOT NULL"
| ]
| partition-by-columns = ["namespace"]
| rules = [
Expand All @@ -527,17 +527,17 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
val exportSchema = {
val fields = new scala.collection.mutable.ArrayBuffer[StructField](11)
fields.append(
StructField("workspace", StringType),
StructField("namespace", StringType),
StructField("metric", StringType),
StructField("workspace", StringType, false),
StructField("namespace", StringType, false),
StructField("metric", StringType, false),
StructField("labels", StringType),
StructField("epoch_timestamp", LongType),
StructField("timestamp", TimestampType),
StructField("epoch_timestamp", LongType, false),
StructField("timestamp", TimestampType, false),
StructField("value", DoubleType),
StructField("year", IntegerType),
StructField("month", IntegerType),
StructField("day", IntegerType),
StructField("hour", IntegerType))
StructField("year", IntegerType, false),
StructField("month", IntegerType, false),
StructField("day", IntegerType, false),
StructField("hour", IntegerType, false))
StructType(fields)
}
pairExportKeyTableConfig._2.tableSchema shouldEqual exportSchema
Expand Down

0 comments on commit d950f32

Please sign in to comment.