Skip to content

Commit

Permalink
feat(cache): add control auto cache-count option (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyogev authored Aug 1, 2019
1 parent 694c46f commit f3cbf34
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
5 changes: 4 additions & 1 deletion config/job_config_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,7 @@ streaming:
batchMode: true
# Add any other options supported by the DataStreamWriter
extraOptions:
opt: val
opt: val

# Optional: controls caching and counting on each output (default is true)
cacheCountOnOutput: false
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ case class Configuration(metrics: Option[Seq[String]],
var showPreviewLines: Option[Int],
var explain: Option[Boolean],
var appName: Option[String],
var continueOnFailedStep: Option[Boolean]) {
var continueOnFailedStep: Option[Boolean],
var cacheCountOnOutput: Option[Boolean]) {

require(metrics.isDefined, "metrics files paths are mandatory")

Expand All @@ -25,6 +26,7 @@ case class Configuration(metrics: Option[Seq[String]],
explain = Option(explain.getOrElse(false))
appName = Option(appName.getOrElse("Metorikku"))
continueOnFailedStep = Option(continueOnFailedStep.getOrElse(false))
cacheCountOnOutput = Option(cacheCountOnOutput.getOrElse(true))

def getReaders: Seq[Reader] = inputs.getOrElse(Map()).map {
case (name, input) => input.getReader(name) }.toSeq
Expand Down
16 changes: 12 additions & 4 deletions src/main/scala/com/yotpo/metorikku/metric/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,18 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
dataFrameName: String,
writer: Writer,
outputType: OutputType,
instrumentationProvider: InstrumentationProvider): Unit = {
dataFrame.cache()
instrumentationProvider: InstrumentationProvider,
cacheCountOnOutput: Option[Boolean]): Unit = {

val dataFrameCount = cacheCountOnOutput match {
case Some(true) => {
dataFrame.cache()
dataFrame.count()
}
case _ => 0
}
val tags = Map("metric" -> metricName, "dataframe" -> dataFrameName, "output_type" -> outputType.toString)
instrumentationProvider.count(name="counter", value=dataFrame.count(), tags=tags)
instrumentationProvider.count(name="counter", value=dataFrameCount, tags=tags)
log.info(s"Starting to Write results of ${dataFrameName}")
try {
writer.write(dataFrame)
Expand Down Expand Up @@ -126,7 +134,7 @@ case class Metric(configuration: Configuration, metricDir: File, metricName: Str
}
else {
writeBatch(dataFrame, dataFrameName, writer,
outputConfig.outputType, job.instrumentationClient)
outputConfig.outputType, job.instrumentationClient, job.config.cacheCountOnOutput)
}
})

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/yotpo/metorikku/test/Tester.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ case class Tester(config: TesterConfig) {
val variables = params.variables
val inputs = getMockFilesFromDir(config.test.mocks, config.basePath)
Configuration(Option(metrics),inputs, variables, None, None, None, None, None,
Option(config.preview > 0), None, None, Option(config.preview), None, None, None)
Option(config.preview > 0), None, None, Option(config.preview), None, None, None, None)
}

private def getMockFilesFromDir(mocks: Option[List[Mock]], testDir: File): Option[Map[String, Input]] = {
Expand Down

0 comments on commit f3cbf34

Please sign in to comment.