From d950f32bc41ecd33df9027434e18a0bba67340ff Mon Sep 17 00:00:00 2001 From: nikitag55 Date: Tue, 27 Feb 2024 09:54:49 -0800 Subject: [PATCH] =?UTF-8?q?misc(spark-job-iceberg-data-export)Assign=20cor?= =?UTF-8?q?rect=20NULL/NOT=20NULL=20type=20to=20columns=20in=20Iceberg=20D?= =?UTF-8?q?ata=20Export=20S=E2=80=A6=20(#1721)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Assign correct NULL/NOT NULL type to columns in Iceberg Data Export Schema * remove false/true value from labelColumn Mapping and determine it dynamically * minor refactor --- core/src/main/resources/filodb-defaults.conf | 7 ++- .../downsampler/chunk/BatchExporter.scala | 48 +++++++------- .../chunk/DownsamplerSettings.scala | 30 +++++---- .../downsampler/DownsamplerMainSpec.scala | 62 +++++++++---------- 4 files changed, 79 insertions(+), 68 deletions(-) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index be947152dc..dbfa2bb4a5 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -619,10 +619,11 @@ filodb { table-path = "s3a://////ws-foo" # to add additional dynamic label-based columns to the table # Eg: _ws_ is the label key in time series to populate column workspace - # Similary, _ns_ is the label key in time series to populate column namespace + # 3rd param NOT NULL/NULL, specifies column can be NOT NULL or NULL + # Similarly, _ns_ is the label key in time series to populate column namespace label-column-mapping = [ - "_ws_", "workspace", - "_ns_", "namespace" + "_ws_", "workspace", "NOT NULL", + "_ns_", "namespace", "NOT NULL" ] # Partition Iceberg Table by any of the col from label-column-mapping partition-by-columns = ["namespace"] diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala index 7c6f71ae1c..297f1b43eb 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala @@ -28,8 +28,9 @@ case class ExportTableConfig(tableName: String, tableSchema: StructType, tablePath: String, exportRules: Seq[ExportRule], - labelColumnMapping: Seq[(String, String)], + labelColumnMapping: Seq[(String, String, String)], partitionByCols: Seq[String]) + /** * All info needed to output a result Spark Row. */ @@ -59,7 +60,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime /** * Returns the index of a column in the export schema. * E.g. "foo" will return `3` if "foo" is the name of the fourth column (zero-indexed) - * of each exported row. + * of each exported row. */ def getColumnIndex(colName: String, exportTableConfig: ExportTableConfig): Option[Int] = { // check if fieldName is from dynamic schema @@ -79,7 +80,8 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime /** * Returns an iterator of a single column of the argument partition's chunk. - * @param iCol the index of the column to iterate. + * + * @param iCol the index of the column to iterate. * @param iRowStart the index of the row to begin iteration from. */ private def getChunkColIter(partition: ReadablePartition, @@ -96,7 +98,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime */ private def matchAllFilters(filters: Seq[ColumnFilter], labels: collection.Map[String, String]): Boolean = { - filters.forall( filt => + filters.forall(filt => labels.get(filt.column) .exists(value => filt.filter.filterFunc(value)) ) @@ -141,7 +143,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime private def exportDataToRow(exportData: ExportRowData, exportTableConfig: ExportTableConfig): Row = { val dataSeq = new mutable.ArrayBuffer[Any](exportTableConfig.tableSchema.fields.length) // append all dynamic column values - exportTableConfig.labelColumnMapping.foreach {pair => dataSeq.append(exportData.labels.get(pair._1))} + exportTableConfig.labelColumnMapping.foreach { pair => dataSeq.append(exportData.labels.get(pair._1)) } // append all fixed column values dataSeq.append( exportData.metric, @@ -177,20 +179,21 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime private def sqlCreateTable(catalog: String, database: String, exportTableConfig: ExportTableConfig) = { // create dynamic col names with type to append to create table statement - val dynamicColNames = exportTableConfig.labelColumnMapping.map(pair => pair._2 + " string").mkString(", ") + val dynamicColNames = exportTableConfig.labelColumnMapping.map(pair => + pair._2 + " string " + pair._3).mkString(", ") val partitionColNames = exportTableConfig.partitionByCols.mkString(", ") s""" |CREATE TABLE IF NOT EXISTS $catalog.$database.${exportTableConfig.tableName} ( | $dynamicColNames, - | metric string, + | metric string NOT NULL, | labels map, - | epoch_timestamp long, - | timestamp timestamp, + | epoch_timestamp long NOT NULL, + | timestamp timestamp NOT NULL, | value double, - | year int, - | month int, - | day int, - | hour int + | year int NOT NULL, + | month int NOT NULL, + | day int NOT NULL, + | hour int NOT NULL | ) | USING iceberg | PARTITIONED BY (year, month, day, $partitionColNames, metric) @@ -205,7 +208,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime partKeyMap.get(label).contains(key(i)) } }.flatMap { case (key, exportTableConfig) => - exportTableConfig.exportRules.takeWhile{ rule => + exportTableConfig.exportRules.takeWhile { rule => // step through rules while we still haven't matched a "block" filter !rule.blockFilterGroups.exists { filterGroup => matchAllFilters(filterGroup, partKeyMap) @@ -220,6 +223,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime } // scalastyle:off method.length + /** * Returns data about a single row to export. * exportDataToRow will convert these into Spark Rows that conform to this.exportSchema. @@ -237,10 +241,10 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime downsamplerSettings.exportDropLabels.foreach(drop => partKeyMap.remove(drop)) rule.dropLabels.foreach(drop => partKeyMap.remove(drop)) - val timestampCol = 0 // FIXME: need a more dynamic (but efficient) solution + val timestampCol = 0 // FIXME: need a more dynamic (but efficient) solution val columns = partition.schema.data.columns val valueCol = columns.indexWhere(_.name == partition.schema.data.valueColName) - val rangeInfoIter = getChunkRangeIter(partition, userStartTime, userEndTime).map{ chunkRange => + val rangeInfoIter = getChunkRangeIter(partition, userStartTime, userEndTime).map { chunkRange => val infoReader = chunkRange.chunkSetInfoReader val irowStart = chunkRange.istartRow val irowEnd = chunkRange.iendRow @@ -253,9 +257,9 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime } val tupleIter = columns(valueCol).columnType match { case DoubleColumn => - rangeInfoIter.flatMap{ info => + rangeInfoIter.flatMap { info => val doubleIter = info.valueIter.asDoubleIt - (info.irowStart to info.irowEnd).iterator.map{ _ => + (info.irowStart to info.irowEnd).iterator.map { _ => (partKeyMap, info.timestampIter.next, doubleIter.next) } } @@ -276,12 +280,12 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime partKeyMap.remove(LABEL_NAME) partKeyMap.put(LABEL_NAME, nameWithoutBucket) - rangeInfoIter.flatMap{ info => + rangeInfoIter.flatMap { info => val histIter = info.valueIter.asHistIt - (info.irowStart to info.irowEnd).iterator.flatMap{ _ => + (info.irowStart to info.irowEnd).iterator.flatMap { _ => val hist = histIter.next() val timestamp = info.timestampIter.next - (0 until hist.numBuckets).iterator.map{ i => + (0 until hist.numBuckets).iterator.map { i => val bucketTopString = { val raw = hist.bucketTop(i) if (raw.isPosInfinity) "+Inf" else raw.toString @@ -303,7 +307,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime numPartitionsExportPrepped.increment() } - tupleIter.map{ case (labels, epoch_timestamp, value) => + tupleIter.map { case (labels, epoch_timestamp, value) => val metric = labels(LABEL_NAME) // to compute YYYY, MM, dd, hh // to compute readable timestamp from unix timestamp diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala index b61952a344..65adcfb034 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala @@ -26,7 +26,7 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ @transient lazy val downsamplerConfig = { val conf = filodbConfig.getConfig("downsampler") - DownsamplerContext.dsLogger.info(s"Loaded following downsampler config: ${conf.root().render()}" ) + DownsamplerContext.dsLogger.info(s"Loaded following downsampler config: ${conf.root().render()}") conf } @@ -95,8 +95,9 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ // below creates Seq[(_ws_,workspace), (_ns_,namespace)] ["_ws_", "workspace", "_ns_", "namespace"] // above means _ws_ label key in time series will be used to populate column workspace // Similarly, _ns_ label key in time series will be used to populate column namespace + // # 3rd param NOT NULL, specifies column can be NULL or NOT NULL val labelColumnMapping = group.as[Seq[String]]("label-column-mapping") - .sliding(2, 2).map(seq => (seq.head, seq.last)).toSeq + .sliding(3, 3).map(seq => (seq.apply(0), seq.apply(1), seq.apply(2))).toSeq // Constructs dynamic exportSchema as per ExportTableConfig. // final schema is a combination of columns defined in conf file plus some // additional standardized columns @@ -107,28 +108,33 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ // + no. of dynamic cols in labelColumnMapping.length val fields = new mutable.ArrayBuffer[StructField](labelColumnMapping.length + 9) // append all dynamic columns as StringType from conf - labelColumnMapping.foreach { pair => fields.append(StructField(pair._2, StringType)) } + labelColumnMapping.foreach { pair => + if (pair._3 == "NOT NULL") + fields.append(StructField(pair._2, StringType, false)) + else + fields.append(StructField(pair._2, StringType, true)) + } // append all fixed columns fields.append( - StructField(COL_METRIC, StringType), + StructField(COL_METRIC, StringType, false), StructField(COL_LABELS, StringType), - StructField(COL_EPOCH_TIMESTAMP, LongType), - StructField(COL_TIMESTAMP, TimestampType), + StructField(COL_EPOCH_TIMESTAMP, LongType, false), + StructField(COL_TIMESTAMP, TimestampType, false), StructField(COL_VALUE, DoubleType), - StructField(COL_YEAR, IntegerType), - StructField(COL_MONTH, IntegerType), - StructField(COL_DAY, IntegerType), - StructField(COL_HOUR, IntegerType) + StructField(COL_YEAR, IntegerType, false), + StructField(COL_MONTH, IntegerType, false), + StructField(COL_DAY, IntegerType, false), + StructField(COL_HOUR, IntegerType, false) ) StructType(fields) } val partitionByCols = group.as[Seq[String]]("partition-by-columns") val rules = group.as[Seq[Config]]("rules").map { rule => - val allowFilterGroups = rule.as[Seq[Seq[String]]]("allow-filters").map{ group => + val allowFilterGroups = rule.as[Seq[Seq[String]]]("allow-filters").map { group => Parser.parseQuery(s"{${group.mkString(",")}}") .asInstanceOf[InstantExpression].getUnvalidatedColumnFilters() } - val blockFilterGroups = rule.as[Seq[Seq[String]]]("block-filters").map{ group => + val blockFilterGroups = rule.as[Seq[Seq[String]]]("block-filters").map { group => Parser.parseQuery(s"{${group.mkString(",")}}") .asInstanceOf[InstantExpression].getUnvalidatedColumnFilters() } diff --git a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala index 2159f8c061..504731aa0f 100644 --- a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala +++ b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala @@ -67,8 +67,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | "table": "", | "table-path": "${exportToFile.getOrElse("")}", | "label-column-mapping": [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | "partition-by-columns": ["namespace"] | "rules": [ @@ -191,8 +191,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1a" | table-path = "s3a://bucket/directory/catalog/database/l1a", | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -218,8 +218,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1a" | table-path = "s3a://bucket/directory/catalog/database/l1a" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -250,8 +250,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1a" | table-path = "s3a://bucket/directory/catalog/database/l1a" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -286,8 +286,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1a" | table-path = "s3a://bucket/directory/catalog/database/l1a" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -318,8 +318,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1a" | table-path = "s3a://bucket/directory/catalog/database/l1a" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -345,8 +345,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1a" | table-path = "s3a://bucket/directory/catalog/database/l1a" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -362,8 +362,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1b" | table-path: "s3a://bucket/directory/catalog/database/l1b" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -389,8 +389,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "l1a" | table-path = "s3a://bucket/directory/catalog/database/l1a" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -470,8 +470,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "my_ws" | table-path = "s3a://bucket/directory/catalog/database/my_ws" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -505,8 +505,8 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | table = "my_ws" | table-path = "s3a://bucket/directory/catalog/database/my_ws" | label-column-mapping = [ - | "_ws_", "workspace", - | "_ns_", "namespace" + | "_ws_", "workspace", "NOT NULL", + | "_ns_", "namespace", "NOT NULL" | ] | partition-by-columns = ["namespace"] | rules = [ @@ -527,17 +527,17 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl val exportSchema = { val fields = new scala.collection.mutable.ArrayBuffer[StructField](11) fields.append( - StructField("workspace", StringType), - StructField("namespace", StringType), - StructField("metric", StringType), + StructField("workspace", StringType, false), + StructField("namespace", StringType, false), + StructField("metric", StringType, false), StructField("labels", StringType), - StructField("epoch_timestamp", LongType), - StructField("timestamp", TimestampType), + StructField("epoch_timestamp", LongType, false), + StructField("timestamp", TimestampType, false), StructField("value", DoubleType), - StructField("year", IntegerType), - StructField("month", IntegerType), - StructField("day", IntegerType), - StructField("hour", IntegerType)) + StructField("year", IntegerType, false), + StructField("month", IntegerType, false), + StructField("day", IntegerType, false), + StructField("hour", IntegerType, false)) StructType(fields) } pairExportKeyTableConfig._2.tableSchema shouldEqual exportSchema