Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(downsample): use jackson to serialize exported labels #1679

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import java.util.Date
import scala.collection.mutable
import scala.util.matching.Regex

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kamon.Kamon
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}
Expand All @@ -18,11 +20,10 @@ import filodb.core.query.ColumnFilter
import filodb.core.store.{ChunkSetInfoReader, ReadablePartition}
import filodb.downsampler.DownsamplerContext
import filodb.downsampler.Utils._
import filodb.downsampler.chunk.BatchExporter.{getExportLabelValueString, DATE_REGEX_MATCHER, LABEL_REGEX_MATCHER}
import filodb.downsampler.chunk.BatchExporter.{DATE_REGEX_MATCHER, LABEL_REGEX_MATCHER}
import filodb.memory.format.{TypedIterator, UnsafeUtils}
import filodb.memory.format.vectors.LongIterator


case class ExportRule(allowFilterGroups: Seq[Seq[ColumnFilter]],
blockFilterGroups: Seq[Seq[ColumnFilter]],
dropLabels: Seq[String])
Expand All @@ -41,13 +42,11 @@ object BatchExporter {
val LABEL_REGEX_MATCHER: Regex = """\{\{(.*)\}\}""".r
val DATE_REGEX_MATCHER: Regex = """<<(.*)>>""".r

/**
* Converts a label's value to a value of an exported row's LABELS column.
*/
def getExportLabelValueString(value: String): String = {
value
// escape all single-quotes and commas if they aren't already escaped
.replaceAll("""\\(\,|\')|(\,|\')""", """\\$1$2""")
val JSON_MAPPER: ObjectMapper = new ObjectMapper()
JSON_MAPPER.registerModule(DefaultScalaModule)

def makeLabelString(labels: collection.Map[String, String]): String = {
JSON_MAPPER.writeValueAsString(labels)
}
}

Expand Down Expand Up @@ -217,14 +216,6 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}
}

private def makeLabelString(labels: collection.Map[String, String]): String = {
val inner = labels
.map {case (k, v) => (k, getExportLabelValueString(v))}
.map {case (k, v) => s"'$k':'$v'"}
.mkString (",")
s"{$inner}"
}

private def mergeLabelStrings(left: String, right: String): String = {
left.substring(0, left.size - 1) + "," + right.substring(1)
}
Expand Down Expand Up @@ -263,7 +254,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}
val tupleIter = columns(valueCol).columnType match {
case DoubleColumn =>
val labelString = makeLabelString(partKeyMap)
val labelString = BatchExporter.makeLabelString(partKeyMap)
rangeInfoIter.flatMap{ info =>
val doubleIter = info.valueIter.asDoubleIt
(info.irowStart to info.irowEnd).iterator.map{ _ =>
Expand All @@ -285,7 +276,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime

// make labelString without __name__; will be replaced with _bucket-suffixed name
partKeyMap.remove("__name__")
val baseLabelString = makeLabelString(partKeyMap)
val baseLabelString = BatchExporter.makeLabelString(partKeyMap)
partKeyMap.put("__name__", nameWithoutBucket)

rangeInfoIter.flatMap{ info =>
Expand All @@ -300,7 +291,7 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
}
val bucketMapping = Map("__name__" -> nameWithBucket, "le" -> bucketTopString)
val bucketLabels = partKeyMap ++ bucketMapping
val labelString = mergeLabelStrings(baseLabelString, makeLabelString(bucketMapping))
val labelString = mergeLabelStrings(baseLabelString, BatchExporter.makeLabelString(bucketMapping))
(bucketLabels, labelString, timestamp, hist.bucketValue(i))
}
}
Expand Down
105 changes: 71 additions & 34 deletions spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,43 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl

implicit val defaultPatience = PatienceConfig(timeout = Span(30, Seconds), interval = Span(250, Millis))

val conf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf")).resolve()
// Add a path here to enable export during these tests. Useful for debugging export data.
val exportToFile = None // Some("file:///path/to/dir/")
val exportConf =
s"""{
| "filodb": { "downsampler": { "data-export": {
| "enabled": ${exportToFile.isDefined},
| "key-labels": [],
| "bucket": "${exportToFile.getOrElse("")}",
| "format": "csv",
| "options": {
| "header": true,
| "escape": "\\"",
| }
| "groups": [
| {
| "key": [],
| "rules": [
| {
| "allow-filters": [],
| "block-filters": [],
| "drop-labels": []
| }
| ]
| }
| ],
| "path-spec": [
| "year", "<<YYYY>>",
| "month", "<<M>>",
| "day", "<<d>>",
| "_metric_", "{{__name__}}"
| ]
| }}}
|}
|""".stripMargin

val baseConf = ConfigFactory.parseFile(new File("conf/timeseries-filodb-server.conf"))
val conf = ConfigFactory.parseString(exportConf).withFallback(baseConf).resolve()
val settings = new DownsamplerSettings(conf)
val schemas = Schemas.fromConfig(settings.filodbConfig).get
val queryConfig = QueryConfig(settings.filodbConfig.getConfig("query"))
Expand Down Expand Up @@ -396,59 +432,60 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
batchExporter.getPartitionByValues(labels).toSeq shouldEqual expected
}

it ("should correctly escape quotes in a label's value prior to export") {
val inputOutputPairs = Map(
it("should correctly escape quotes and commas in a label's value prior to export") {
val inputOutputPairs = Seq(
// empty string
"" -> "",
"" -> """{"key":""}""",
// no quotes
""" abc """ -> """ abc """,
""" abc """ -> """{"key":" abc "}""",
// ======= DOUBLE QUOTES =======
// single double-quote
""" " """ -> """ " """,
""" " """ -> """{"key":" \" "}""",
// double-quote pair
""" "" """ -> """ "" """,
""" "" """ -> """{"key":" \"\" "}""",
// escaped quote
""" \" """ -> """ \" """,
""" \" """ -> """{"key":" \\\" "}""",
// double-escaped quote
""" \\" """ -> """ \\" """,
""" \\" """ -> """{"key":" \\\\\" "}""",
// double-escaped quote pair
""" \\"" """ -> """ \\"" """,
""" \\"" """ -> """{"key":" \\\\\"\" "}""",
// complex string
""" "foo\" " ""\""\ bar "baz" """ -> """ "foo\" " ""\""\ bar "baz" """,
""" "foo\" " ""\""\ bar "baz" """ -> """{"key":" \"foo\\\" \" \"\"\\\"\"\\ bar \"baz\" "}""",
// ======= SINGLE QUOTES =======
// single single-quote
""" ' """ -> """ \' """,
""" ' """ -> """{"key":" ' "}""",
// single-quote pair
""" '' """ -> """ \'\' """,
""" '' """ -> """{"key":" '' "}""",
// escaped quote
""" \' """ -> """ \' """,
""" \' """ -> """{"key":" \\' "}""",
// double-escaped quote
""" \\' """ -> """ \\' """,
""" \\' """ -> """{"key":" \\\\' "}""",
// double-escaped quote pair
""" \\'' """ -> """ \\'\' """,
""" \\'' """ -> """{"key":" \\\\'' "}""",
// complex string
""" 'foo\' ' ''\''\ bar 'baz' """ -> """ \'foo\' \' \'\'\'\'\ bar \'baz\' """,
""" 'foo\' ' ''\''\ bar 'baz' """ -> """{"key":" 'foo\\' ' ''\\''\\ bar 'baz' "}""",
// ======= COMMAS =======
// single single-quote
""" , """ -> """ \, """,
// single-quote pair
""" ,, """ -> """ \,\, """,
// escaped quote
""" \, """ -> """ \, """,
// double-escaped quote
""" \\, """ -> """ \\, """,
// double-escaped quote pair
""" \\,, """ -> """ \\,\, """,
// single comma
""" , """ -> """{"key":" , "}""",
// comma pair
""" ,, """ -> """{"key":" ,, "}""",
// escaped comma
""" \, """ -> """{"key":" \\, "}""",
// double-escaped comma
""" \\, """ -> """{"key":" \\\\, "}""",
// double-escaped comma pair
""" \\,, """ -> """{"key":" \\\\,, "}""",
// complex string
""" 'foo\' ' ''\''\ bar 'baz' """ -> """ \'foo\' \' \'\'\'\'\ bar \'baz\' """,
// ======= COMBINATION =======
""" 'foo\" ' "'\'"\ bar 'baz" """ -> """ \'foo\" \' "\'\'"\ bar \'baz" """,
""" 'foo\" ' ,, "'\'"\ bar 'baz" \, """ -> """ \'foo\" \' \,\, "\'\'"\ bar \'baz" \, """,
""" ["foo","ba'r:1234"] """ -> """ ["foo"\,"ba\'r:1234"] """,
""" "foo,bar:1234" """ -> """ "foo\,bar:1234" """
""" 'foo\' ' ''\''\ bar 'baz' """ -> """{"key":" 'foo\\' ' ''\\''\\ bar 'baz' "}""",
// ======= COMBINATIONS =======
""" 'foo\" ' "'\'"\ bar 'baz" """ -> """{"key":" 'foo\\\" ' \"'\\'\"\\ bar 'baz\" "}""",
""" 'foo\" ' ,, "'\'"\ bar 'baz" \, """ -> """{"key":" 'foo\\\" ' ,, \"'\\'\"\\ bar 'baz\" \\, "}""",
""" ["foo","ba'r:1234"] """ -> """{"key":" [\"foo\",\"ba'r:1234\"] "}""",
""" "foo,bar:1234" """ -> """{"key":" \"foo,bar:1234\" "}"""
)
for ((value, expected) <- inputOutputPairs) {
BatchExporter.getExportLabelValueString(value) shouldEqual expected
val map = Map("key" -> value)
BatchExporter.makeLabelString(map) shouldEqual expected
}
}

Expand Down
Loading