diff --git a/src/main/scala/com/databricks/labs/overwatch/utils/PipelineValidationHelper.scala b/src/main/scala/com/databricks/labs/overwatch/utils/PipelineValidationHelper.scala index ae1941fbb..fc5b76b4b 100644 --- a/src/main/scala/com/databricks/labs/overwatch/utils/PipelineValidationHelper.scala +++ b/src/main/scala/com/databricks/labs/overwatch/utils/PipelineValidationHelper.scala @@ -16,7 +16,7 @@ import scala.collection.mutable.ArrayBuffer * This class contains the utility functions for PipelineValidation.scala. */ -class PipelineValidationHelper(_etlDB: String ,_allRun: Boolean = true) extends SparkSessionWrapper { +class PipelineValidationHelper(_etlDB: String ,_allRun: Boolean = true,_dateWindow: Array[String] = Array()) extends SparkSessionWrapper { import spark.implicits._ @@ -32,6 +32,7 @@ class PipelineValidationHelper(_etlDB: String ,_allRun: Boolean = true) extends def quarantineID: String = _quarantine_id def etlDB: String = _etlDB + def dateWindow: Array[String] = _dateWindow val allRun: Boolean = _allRun private var _validations: ArrayBuffer[HealthCheckReport] = new ArrayBuffer[HealthCheckReport]() @@ -76,9 +77,20 @@ class PipelineValidationHelper(_etlDB: String ,_allRun: Boolean = true) extends All_Overwatch_RunID }else { if (Helpers.pathExists(healthCheckReportPath)) { - val healthCheckDF = spark.read.load(healthCheckReportPath) - val Validated_Overwatch_RunIDs = healthCheckDF.select("Overwatch_RunID").distinct().collect().map(_.getString(0)) - All_Overwatch_RunID.diff(Validated_Overwatch_RunIDs) + if (dateWindow.isEmpty) { + val Msg = s"Date Window is not provided. Validation will run for all overwatch_runID which is yet to be validated" + logger.log(Level.INFO, Msg) + val healthCheckDF = spark.read.load(healthCheckReportPath) + val Validated_Overwatch_RunIDs = healthCheckDF.select("Overwatch_RunID").distinct().collect().map(_.getString(0)) + All_Overwatch_RunID.diff(Validated_Overwatch_RunIDs) + }else{ + val start_date = dateWindow(0) + val end_date = dateWindow(1) + val filtered_overwatch_RunID = spark.read.table(s"$etlDB.pipeline_report").filter( + col("Pipeline_SnapTS") > start_date && col("Pipeline_SnapTS") <= end_date) + .select("Overwatch_RunID").distinct().collect().map(_.getString(0)) + filtered_overwatch_RunID + } } else { All_Overwatch_RunID } diff --git a/src/main/scala/com/databricks/labs/overwatch/validation/PipelineValidation.scala b/src/main/scala/com/databricks/labs/overwatch/validation/PipelineValidation.scala index 40a7f1a14..a23a1affb 100644 --- a/src/main/scala/com/databricks/labs/overwatch/validation/PipelineValidation.scala +++ b/src/main/scala/com/databricks/labs/overwatch/validation/PipelineValidation.scala @@ -15,27 +15,27 @@ import org.apache.spark.sql.DataFrame */ object PipelineValidation extends SparkSessionWrapper { - def apply(etlDB : String , allRun: Boolean) :Unit = { - new PipelineValidation(etlDB,allRun) + def apply(etlDB : String , allRun: Boolean,_dateWindow: Array[String]) :Unit = { + new PipelineValidation(etlDB,allRun,_dateWindow) .setPipelineSnapTime() .process() } - def apply(etlDB : String, allRun: Boolean, table : Array[String]) :Unit = { - new PipelineValidation(etlDB,allRun) + def apply(etlDB : String, allRun: Boolean,_dateWindow: Array[String], table : Array[String]) :Unit = { + new PipelineValidation(etlDB,allRun,_dateWindow) .setPipelineSnapTime() .process(table) } - def apply(etlDB : String, allRun: Boolean, table : Array[String],crossTableValidation : Boolean) :Unit = { - new PipelineValidation(etlDB,allRun) + def apply(etlDB : String, allRun: Boolean,_dateWindow: Array[String], table : Array[String],crossTableValidation : Boolean) :Unit = { + new PipelineValidation(etlDB,allRun,_dateWindow) .setPipelineSnapTime() .process(table,crossTableValidation) } } -class PipelineValidation (_etlDB: String, _allRun: Boolean) extends PipelineValidationHelper(_etlDB,_allRun) with SparkSessionWrapper { +class PipelineValidation (_etlDB: String, _allRun: Boolean,_dateWindow: Array[String]) extends PipelineValidationHelper(_etlDB,_allRun,_dateWindow) with SparkSessionWrapper { import spark.implicits._