diff --git a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala index 9eeda9842b..100bce1b54 100644 --- a/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala +++ b/spark-jobs/src/main/scala/filodb/downsampler/chunk/BatchExporter.scala @@ -7,6 +7,8 @@ import java.util.Date import scala.collection.mutable import scala.util.matching.Regex +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import kamon.Kamon import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType} @@ -18,11 +20,10 @@ import filodb.core.query.ColumnFilter import filodb.core.store.{ChunkSetInfoReader, ReadablePartition} import filodb.downsampler.DownsamplerContext import filodb.downsampler.Utils._ -import filodb.downsampler.chunk.BatchExporter.{getExportLabelValueString, DATE_REGEX_MATCHER, LABEL_REGEX_MATCHER} +import filodb.downsampler.chunk.BatchExporter.{DATE_REGEX_MATCHER, LABEL_REGEX_MATCHER} import filodb.memory.format.{TypedIterator, UnsafeUtils} import filodb.memory.format.vectors.LongIterator - case class ExportRule(allowFilterGroups: Seq[Seq[ColumnFilter]], blockFilterGroups: Seq[Seq[ColumnFilter]], dropLabels: Seq[String]) @@ -41,13 +42,11 @@ object BatchExporter { val LABEL_REGEX_MATCHER: Regex = """\{\{(.*)\}\}""".r val DATE_REGEX_MATCHER: Regex = """<<(.*)>>""".r - /** - * Converts a label's value to a value of an exported row's LABELS column. - */ - def getExportLabelValueString(value: String): String = { - value - // escape all single-quotes and commas if they aren't already escaped - .replaceAll("""\\(\,|\')|(\,|\')""", """\\$1$2""") + val JSON_MAPPER: ObjectMapper = new ObjectMapper() + JSON_MAPPER.registerModule(DefaultScalaModule) + + def makeLabelString(labels: collection.Map[String, String]): String = { + JSON_MAPPER.writeValueAsString(labels) } } @@ -217,14 +216,6 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime } } - private def makeLabelString(labels: collection.Map[String, String]): String = { - val inner = labels - .map {case (k, v) => (k, getExportLabelValueString(v))} - .map {case (k, v) => s"'$k':'$v'"} - .mkString (",") - s"{$inner}" - } - private def mergeLabelStrings(left: String, right: String): String = { left.substring(0, left.size - 1) + "," + right.substring(1) } @@ -263,7 +254,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime } val tupleIter = columns(valueCol).columnType match { case DoubleColumn => - val labelString = makeLabelString(partKeyMap) + val labelString = BatchExporter.makeLabelString(partKeyMap) rangeInfoIter.flatMap{ info => val doubleIter = info.valueIter.asDoubleIt (info.irowStart to info.irowEnd).iterator.map{ _ => @@ -285,7 +276,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime // make labelString without __name__; will be replaced with _bucket-suffixed name partKeyMap.remove("__name__") - val baseLabelString = makeLabelString(partKeyMap) + val baseLabelString = BatchExporter.makeLabelString(partKeyMap) partKeyMap.put("__name__", nameWithoutBucket) rangeInfoIter.flatMap{ info => @@ -300,7 +291,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime } val bucketMapping = Map("__name__" -> nameWithBucket, "le" -> bucketTopString) val bucketLabels = partKeyMap ++ bucketMapping - val labelString = mergeLabelStrings(baseLabelString, makeLabelString(bucketMapping)) + val labelString = mergeLabelStrings(baseLabelString, BatchExporter.makeLabelString(bucketMapping)) (bucketLabels, labelString, timestamp, hist.bucketValue(i)) } } diff --git a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala index 6fa2640d82..ead8184bfd 100644 --- a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala +++ b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala @@ -51,7 +51,43 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(250, Millis)) - val conf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf")).resolve() + // Add a path here to enable export during these tests. Useful for debugging export data. + val exportToFile = None // Some("file:///path/to/dir/") + val exportConf = + s"""{ + | "filodb": { "downsampler": { "data-export": { + | "enabled": ${exportToFile.isDefined}, + | "key-labels": [], + | "bucket": "${exportToFile.getOrElse("")}", + | "format": "csv", + | "options": { + | "header": true, + | "escape": "\\"", + | } + | "groups": [ + | { + | "key": [], + | "rules": [ + | { + | "allow-filters": [], + | "block-filters": [], + | "drop-labels": [] + | } + | ] + | } + | ], + | "path-spec": [ + | "year", "<>", + | "month", "<>", + | "day", "<>", + | "_metric_", "{{__name__}}" + | ] + | }}} + |} + |""".stripMargin + + val baseConf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf")) + val conf = ConfigFactory.parseString(exportConf).withFallback(baseConf).resolve() val settings = new DownsamplerSettings(conf) val schemas = Schemas.fromConfig(settings.filodbConfig).get val queryConfig = QueryConfig(settings.filodbConfig.getConfig("query")) @@ -396,59 +432,60 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl batchExporter.getPartitionByValues(labels).toSeq shouldEqual expected } - it ("should correctly escape quotes in a label's value prior to export") { - val inputOutputPairs = Map( + it("should correctly escape quotes and commas in a label's value prior to export") { + val inputOutputPairs = Seq( // empty string - "" -> "", + "" -> """{"key":""}""", // no quotes - """ abc """ -> """ abc """, + """ abc """ -> """{"key":" abc "}""", // ======= DOUBLE QUOTES ======= // single double-quote - """ " """ -> """ " """, + """ " """ -> """{"key":" \" "}""", // double-quote pair - """ "" """ -> """ "" """, + """ "" """ -> """{"key":" \"\" "}""", // escaped quote - """ \" """ -> """ \" """, + """ \" """ -> """{"key":" \\\" "}""", // double-escaped quote - """ \\" """ -> """ \\" """, + """ \\" """ -> """{"key":" \\\\\" "}""", // double-escaped quote pair - """ \\"" """ -> """ \\"" """, + """ \\"" """ -> """{"key":" \\\\\"\" "}""", // complex string - """ "foo\" " ""\""\ bar "baz" """ -> """ "foo\" " ""\""\ bar "baz" """, + """ "foo\" " ""\""\ bar "baz" """ -> """{"key":" \"foo\\\" \" \"\"\\\"\"\\ bar \"baz\" "}""", // ======= SINGLE QUOTES ======= // single single-quote - """ ' """ -> """ \' """, + """ ' """ -> """{"key":" ' "}""", // single-quote pair - """ '' """ -> """ \'\' """, + """ '' """ -> """{"key":" '' "}""", // escaped quote - """ \' """ -> """ \' """, + """ \' """ -> """{"key":" \\' "}""", // double-escaped quote - """ \\' """ -> """ \\' """, + """ \\' """ -> """{"key":" \\\\' "}""", // double-escaped quote pair - """ \\'' """ -> """ \\'\' """, + """ \\'' """ -> """{"key":" \\\\'' "}""", // complex string - """ 'foo\' ' ''\''\ bar 'baz' """ -> """ \'foo\' \' \'\'\'\'\ bar \'baz\' """, + """ 'foo\' ' ''\''\ bar 'baz' """ -> """{"key":" 'foo\\' ' ''\\''\\ bar 'baz' "}""", // ======= COMMAS ======= - // single single-quote - """ , """ -> """ \, """, - // single-quote pair - """ ,, """ -> """ \,\, """, - // escaped quote - """ \, """ -> """ \, """, - // double-escaped quote - """ \\, """ -> """ \\, """, - // double-escaped quote pair - """ \\,, """ -> """ \\,\, """, + // single comma + """ , """ -> """{"key":" , "}""", + // comma pair + """ ,, """ -> """{"key":" ,, "}""", + // escaped comma + """ \, """ -> """{"key":" \\, "}""", + // double-escaped comma + """ \\, """ -> """{"key":" \\\\, "}""", + // double-escaped comma pair + """ \\,, """ -> """{"key":" \\\\,, "}""", // complex string - """ 'foo\' ' ''\''\ bar 'baz' """ -> """ \'foo\' \' \'\'\'\'\ bar \'baz\' """, - // ======= COMBINATION ======= - """ 'foo\" ' "'\'"\ bar 'baz" """ -> """ \'foo\" \' "\'\'"\ bar \'baz" """, - """ 'foo\" ' ,, "'\'"\ bar 'baz" \, """ -> """ \'foo\" \' \,\, "\'\'"\ bar \'baz" \, """, - """ ["foo","ba'r:1234"] """ -> """ ["foo"\,"ba\'r:1234"] """, - """ "foo,bar:1234" """ -> """ "foo\,bar:1234" """ + """ 'foo\' ' ''\''\ bar 'baz' """ -> """{"key":" 'foo\\' ' ''\\''\\ bar 'baz' "}""", + // ======= COMBINATIONS ======= + """ 'foo\" ' "'\'"\ bar 'baz" """ -> """{"key":" 'foo\\\" ' \"'\\'\"\\ bar 'baz\" "}""", + """ 'foo\" ' ,, "'\'"\ bar 'baz" \, """ -> """{"key":" 'foo\\\" ' ,, \"'\\'\"\\ bar 'baz\" \\, "}""", + """ ["foo","ba'r:1234"] """ -> """{"key":" [\"foo\",\"ba'r:1234\"] "}""", + """ "foo,bar:1234" """ -> """{"key":" \"foo,bar:1234\" "}""" ) for ((value, expected) <- inputOutputPairs) { - BatchExporter.getExportLabelValueString(value) shouldEqual expected + val map = Map("key" -> value) + BatchExporter.makeLabelString(map) shouldEqual expected } }