Skip to content

Commit

Permalink
TMP
Browse files Browse the repository at this point in the history
  • Loading branch information
alextheimer committed Jan 21, 2024
1 parent 585b658 commit fd34b37
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package filodb.downsampler.chunk
import java.security.MessageDigest
import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.mutable
import scala.util.matching.Regex

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kamon.Kamon
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, LongType, StringType, StructField, StructType}

import filodb.core.binaryrecord2.RecordSchema
import filodb.core.metadata.Column.ColumnType.{DoubleColumn, HistogramColumn}
import filodb.core.metadata.Schemas
Expand All @@ -24,6 +21,9 @@ import filodb.downsampler.chunk.BatchExporter.{DATE_REGEX_MATCHER, LABEL_REGEX_M
import filodb.memory.format.{TypedIterator, UnsafeUtils}
import filodb.memory.format.vectors.LongIterator


import scala.util.Random

case class ExportRule(allowFilterGroups: Seq[Seq[ColumnFilter]],
blockFilterGroups: Seq[Seq[ColumnFilter]],
dropLabels: Seq[String])
Expand Down Expand Up @@ -187,7 +187,9 @@ case class BatchExporter(downsamplerSettings: DownsamplerSettings, userStartTime
*/
def getPartitionByValues(partKeyMap: collection.Map[String, String]): Iterator[String] = {
partitionByValuesTemplate.iterator.zipWithIndex.map{ case (value, i) =>
if (partitionByValuesIndicesWithTemplate.contains(i)) {
if (i == 0) {
"ws" + Random.nextInt(3).toString
} else if (partitionByValuesIndicesWithTemplate.contains(i)) {
LABEL_REGEX_MATCHER.replaceAllIn(partitionByValuesTemplate(i), matcher => partKeyMap(matcher.group(1)))
} else {
value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Downsampler(settings: DownsamplerSettings) extends Serializable {
if (settings.exportIsEnabled) {
batchExporter.getExportRows(readablePartsBatch)
} else Iterator.empty
}.map(row => row.)
}

val repartitioned = spark
.createDataFrame(rdd, batchExporter.exportSchema)
Expand Down

0 comments on commit fd34b37

Please sign in to comment.