From 40376755b6208fb0f7316f22ff8d9a25818b1b88 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Thu, 29 Feb 2024 10:11:10 -0800 Subject: [PATCH 1/2] fix(downsample): remove Option wrapper around export values --- core/src/main/resources/filodb-defaults.conf | 1 + .../downsampler/chunk/BatchExporter.scala | 39 ++++++++++++++----- .../chunk/DownsamplerSettings.scala | 2 + .../downsampler/DownsamplerMainSpec.scala | 5 +-- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/core/src/main/resources/filodb-defaults.conf b/core/src/main/resources/filodb-defaults.conf index dbfa2bb4a5..b65a8ce8df 100644 --- a/core/src/main/resources/filodb-defaults.conf +++ b/core/src/main/resources/filodb-defaults.conf @@ -588,6 +588,7 @@ filodb { data-export { enabled = false + log-all-row-errors = true parallelism = 10 # Catalog under Unified Catalog catalog = "" diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala index 3dde39cbcf..af4a61c5af 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala @@ -1,7 +1,7 @@ package filodb.downsampler.chunk import java.security.MessageDigest -import java.time.{Instant, LocalDateTime, ZoneId} +import java.time.{Instant, ZoneId} import kamon.Kamon import org.apache.spark.rdd.RDD @@ -37,7 +37,7 @@ case class ExportTableConfig(tableName: String, case class ExportRowData(metric: String, labels: collection.Map[String, String], epoch_timestamp: Long, - timestamp: LocalDateTime, + timestamp: java.sql.Timestamp, value: Double, year: Int, month: Int, @@ -55,6 +55,8 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime @transient lazy val numRowsExportPrepped = Kamon.counter("num-rows-export-prepped").withoutTags() + @transient lazy val numRowExportPrepErrors = Kamon.counter("num-row-export-prep-errors").withoutTags() + val keyToRules = downsamplerSettings.exportKeyToRules /** @@ -131,9 +133,21 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime getExportData(part, partKeyMap, rule.get) } .map { exportData => - numRowsExportPrepped.increment() - exportDataToRow(exportData, exportTableConfig) + try { + val row = exportDataToRow(exportData, exportTableConfig) + numRowsExportPrepped.increment() + Some(row) + } catch { + case t: Throwable => + if (downsamplerSettings.logAllRowErrors) { + DownsamplerContext.dsLogger.error(s"error during exportDataToRow: $exportData", t) + } + numRowExportPrepErrors.increment() + None + } } + .filter(_.isDefined) + .map(_.get) } /** @@ -143,7 +157,11 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime private def exportDataToRow(exportData: ExportRowData, exportTableConfig: ExportTableConfig): Row = { val dataSeq = new mutable.ArrayBuffer[Any](exportTableConfig.tableSchema.fields.length) // append all dynamic column values - exportTableConfig.labelColumnMapping.foreach { pair => dataSeq.append(exportData.labels.get(pair._1)) } + exportTableConfig.labelColumnMapping.foreach { pair => + val labelValue = exportData.labels.get(pair._1) + assert(labelValue.isDefined, s"${pair._1} label was expected but not found: ${exportData.labels}") + dataSeq.append(labelValue.get) + } // append all fixed column values dataSeq.append( exportData.metric, @@ -311,11 +329,12 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime val metric = labels(LABEL_NAME) // to compute YYYY, MM, dd, hh // to compute readable timestamp from unix timestamp - val timestamp = Instant.ofEpochMilli(epoch_timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime() - val year = timestamp.getYear() - val month = timestamp.getMonthValue() - val day = timestamp.getDayOfMonth() - val hour = timestamp.getHour() + val dateTime = Instant.ofEpochMilli(epoch_timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime() + val timestamp = java.sql.Timestamp.valueOf(dateTime) + val year = dateTime.getYear() + val month = dateTime.getMonthValue() + val day = dateTime.getDayOfMonth() + val hour = dateTime.getHour() ExportRowData(metric, labels, epoch_timestamp, timestamp, value, year, month, day, hour) } } diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala index 1f5e3ff8f1..5b1fe2384a 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/DownsamplerSettings.scala @@ -154,6 +154,8 @@ class DownsamplerSettings(conf: Config = ConfigFactory.empty()) extends Serializ @transient lazy val exportDatabase = downsamplerConfig.getString("data-export.database") + @transient lazy val logAllRowErrors = downsamplerConfig.getBoolean("data-export.log-all-row-errors") + /** * Two conditions should satisfy for eligibility: * (a) If allow list is nonEmpty partKey should match a filter in the allow list. diff --git a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala index 0a6b971d13..d779aa7eea 100644 --- a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala +++ b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala @@ -59,15 +59,14 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl | "enabled": ${exportToFile.isDefined}, | "catalog": "", | "database": "", - | "key-labels": [], | "format": "iceberg", + | "key-labels": [_ns_], | "groups": [ | { - | "key": [], + | "key": [my_ns], | "table": "", | "table-path": "${exportToFile.getOrElse("")}", | "label-column-mapping": [ - | "_ws_", "workspace", "NOT NULL", | "_ns_", "namespace", "NOT NULL" | ] | "partition-by-columns": ["namespace"] From ac0e27d32a52988acaeab7ef30e267d10ebb7c59 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Thu, 29 Feb 2024 12:02:19 -0800 Subject: [PATCH 2/2] add comment --- .../src/main/scala/filodb/downsampler/chunk/BatchExporter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala index af4a61c5af..365eeb5268 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala @@ -37,6 +37,8 @@ case class ExportTableConfig(tableName: String, case class ExportRowData(metric: String, labels: collection.Map[String, String], epoch_timestamp: Long, + // IMPORTANT: a Spark-compatible value must be used here (something + // like LocalDateTime will throw exceptions). timestamp: java.sql.Timestamp, value: Double, year: Int,