Skip to content

Commit

Permalink
fix(downsample): use jackson to serialize exported labels (#1679)
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer authored Oct 2, 2023
1 parent b537be9 commit bcc124b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import java.util.Date
import scala.collection.mutable
import scala.util.matching.Regex

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kamon.Kamon
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}
Expand All @@ -18,11 +20,10 @@ import filodb.core.query.ColumnFilter
import filodb.core.store.{ChunkSetInfoReader, ReadablePartition}
import filodb.downsampler.DownsamplerContext
import filodb.downsampler.Utils._
import filodb.downsampler.chunk.BatchExporter.{getExportLabelValueString, DATE_REGEX_MATCHER, LABEL_REGEX_MATCHER}
import filodb.downsampler.chunk.BatchExporter.{DATE_REGEX_MATCHER, LABEL_REGEX_MATCHER}
import filodb.memory.format.{TypedIterator, UnsafeUtils}
import filodb.memory.format.vectors.LongIterator


case class ExportRule(allowFilterGroups: Seq[Seq[ColumnFilter]],
blockFilterGroups: Seq[Seq[ColumnFilter]],
dropLabels: Seq[String])
Expand All @@ -41,13 +42,11 @@ object BatchExporter {
val LABEL_REGEX_MATCHER: Regex = """\{\{(.*)\}\}""".r
val DATE_REGEX_MATCHER: Regex = """<<(.*)>>""".r

/**
* Converts a label's value to a value of an exported row's LABELS column.
*/
def getExportLabelValueString(value: String): String = {
value
// escape all single-quotes and commas if they aren't already escaped
.replaceAll("""\\(\,|\')|(\,|\')""", """\\$1$2""")
val JSON_MAPPER: ObjectMapper = new ObjectMapper()
JSON_MAPPER.registerModule(DefaultScalaModule)

def makeLabelString(labels: collection.Map[String, String]): String = {
JSON_MAPPER.writeValueAsString(labels)
}
}

Expand Down Expand Up @@ -217,14 +216,6 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}
}

private def makeLabelString(labels: collection.Map[String, String]): String = {
val inner = labels
.map {case (k, v) => (k, getExportLabelValueString(v))}
.map {case (k, v) => s"'$k':'$v'"}
.mkString (",")
s"{$inner}"
}

private def mergeLabelStrings(left: String, right: String): String = {
left.substring(0, left.size - 1) + "," + right.substring(1)
}
Expand Down Expand Up @@ -263,7 +254,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}
val tupleIter = columns(valueCol).columnType match {
case DoubleColumn =>
val labelString = makeLabelString(partKeyMap)
val labelString = BatchExporter.makeLabelString(partKeyMap)
rangeInfoIter.flatMap{ info =>
val doubleIter = info.valueIter.asDoubleIt
(info.irowStart to info.irowEnd).iterator.map{ _ =>
Expand All @@ -285,7 +276,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime

// make labelString without __name__; will be replaced with _bucket-suffixed name
partKeyMap.remove("__name__")
val baseLabelString = makeLabelString(partKeyMap)
val baseLabelString = BatchExporter.makeLabelString(partKeyMap)
partKeyMap.put("__name__", nameWithoutBucket)

rangeInfoIter.flatMap{ info =>
Expand All @@ -300,7 +291,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}
val bucketMapping = Map("__name__" -> nameWithBucket, "le" -> bucketTopString)
val bucketLabels = partKeyMap ++ bucketMapping
val labelString = mergeLabelStrings(baseLabelString, makeLabelString(bucketMapping))
val labelString = mergeLabelStrings(baseLabelString, BatchExporter.makeLabelString(bucketMapping))
(bucketLabels, labelString, timestamp, hist.bucketValue(i))
}
}
Expand Down
105 changes: 71 additions & 34 deletions spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,43 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl

implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(250, Millis))

val conf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf")).resolve()
// Add a path here to enable export during these tests. Useful for debugging export data.
val exportToFile = None // Some("file:///path/to/dir/")
val exportConf =
s"""{
| "filodb": { "downsampler": { "data-export": {
| "enabled": ${exportToFile.isDefined},
| "key-labels": [],
| "bucket": "${exportToFile.getOrElse("")}",
| "format": "csv",
| "options": {
| "header": true,
| "escape": "\\"",
| }
| "groups": [
| {
| "key": [],
| "rules": [
| {
| "allow-filters": [],
| "block-filters": [],
| "drop-labels": []
| }
| ]
| }
| ],
| "path-spec": [
| "year", "<<YYYY>>",
| "month", "<<M>>",
| "day", "<<d>>",
| "_metric_", "{{__name__}}"
| ]
| }}}
|}
|""".stripMargin

val baseConf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf"))
val conf = ConfigFactory.parseString(exportConf).withFallback(baseConf).resolve()
val settings = new DownsamplerSettings(conf)
val schemas = Schemas.fromConfig(settings.filodbConfig).get
val queryConfig = QueryConfig(settings.filodbConfig.getConfig("query"))
Expand Down Expand Up @@ -396,59 +432,60 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
batchExporter.getPartitionByValues(labels).toSeq shouldEqual expected
}

it ("should correctly escape quotes in a label's value prior to export") {
val inputOutputPairs = Map(
it("should correctly escape quotes and commas in a label's value prior to export") {
val inputOutputPairs = Seq(
// empty string
"" -> "",
"" -> """{"key":""}""",
// no quotes
""" abc """ -> """ abc """,
""" abc """ -> """{"key":" abc "}""",
// ======= DOUBLE QUOTES =======
// single double-quote
""" " """ -> """ " """,
""" " """ -> """{"key":" \" "}""",
// double-quote pair
""" "" """ -> """ "" """,
""" "" """ -> """{"key":" \"\" "}""",
// escaped quote
""" \" """ -> """ \" """,
""" \" """ -> """{"key":" \\\" "}""",
// double-escaped quote
""" \\" """ -> """ \\" """,
""" \\" """ -> """{"key":" \\\\\" "}""",
// double-escaped quote pair
""" \\"" """ -> """ \\"" """,
""" \\"" """ -> """{"key":" \\\\\"\" "}""",
// complex string
""" "foo\" " ""\""\ bar "baz" """ -> """ "foo\" " ""\""\ bar "baz" """,
""" "foo\" " ""\""\ bar "baz" """ -> """{"key":" \"foo\\\" \" \"\"\\\"\"\\ bar \"baz\" "}""",
// ======= SINGLE QUOTES =======
// single single-quote
""" ' """ -> """ \' """,
""" ' """ -> """{"key":" ' "}""",
// single-quote pair
""" '' """ -> """ \'\' """,
""" '' """ -> """{"key":" '' "}""",
// escaped quote
""" \' """ -> """ \' """,
""" \' """ -> """{"key":" \\' "}""",
// double-escaped quote
""" \\' """ -> """ \\' """,
""" \\' """ -> """{"key":" \\\\' "}""",
// double-escaped quote pair
""" \\'' """ -> """ \\'\' """,
""" \\'' """ -> """{"key":" \\\\'' "}""",
// complex string
""" 'foo\' ' ''\''\ bar 'baz' """ -> """ \'foo\' \' \'\'\'\'\ bar \'baz\' """,
""" 'foo\' ' ''\''\ bar 'baz' """ -> """{"key":" 'foo\\' ' ''\\''\\ bar 'baz' "}""",
// ======= COMMAS =======
// single single-quote
""" , """ -> """ \, """,
// single-quote pair
""" ,, """ -> """ \,\, """,
// escaped quote
""" \, """ -> """ \, """,
// double-escaped quote
""" \\, """ -> """ \\, """,
// double-escaped quote pair
""" \\,, """ -> """ \\,\, """,
// single comma
""" , """ -> """{"key":" , "}""",
// comma pair
""" ,, """ -> """{"key":" ,, "}""",
// escaped comma
""" \, """ -> """{"key":" \\, "}""",
// double-escaped comma
""" \\, """ -> """{"key":" \\\\, "}""",
// double-escaped comma pair
""" \\,, """ -> """{"key":" \\\\,, "}""",
// complex string
""" 'foo\' ' ''\''\ bar 'baz' """ -> """ \'foo\' \' \'\'\'\'\ bar \'baz\' """,
// ======= COMBINATION =======
""" 'foo\" ' "'\'"\ bar 'baz" """ -> """ \'foo\" \' "\'\'"\ bar \'baz" """,
""" 'foo\" ' ,, "'\'"\ bar 'baz" \, """ -> """ \'foo\" \' \,\, "\'\'"\ bar \'baz" \, """,
""" ["foo","ba'r:1234"] """ -> """ ["foo"\,"ba\'r:1234"] """,
""" "foo,bar:1234" """ -> """ "foo\,bar:1234" """
""" 'foo\' ' ''\''\ bar 'baz' """ -> """{"key":" 'foo\\' ' ''\\''\\ bar 'baz' "}""",
// ======= COMBINATIONS =======
""" 'foo\" ' "'\'"\ bar 'baz" """ -> """{"key":" 'foo\\\" ' \"'\\'\"\\ bar 'baz\" "}""",
""" 'foo\" ' ,, "'\'"\ bar 'baz" \, """ -> """{"key":" 'foo\\\" ' ,, \"'\\'\"\\ bar 'baz\" \\, "}""",
""" ["foo","ba'r:1234"] """ -> """{"key":" [\"foo\",\"ba'r:1234\"] "}""",
""" "foo,bar:1234" """ -> """{"key":" \"foo,bar:1234\" "}"""
)
for ((value, expected) <- inputOutputPairs) {
BatchExporter.getExportLabelValueString(value) shouldEqual expected
val map = Map("key" -> value)
BatchExporter.makeLabelString(map) shouldEqual expected
}
}

Expand Down

0 comments on commit bcc124b

Please sign in to comment.