Skip to content

Commit

Permalink
Creation of Exact Quantile Check (#512)
Browse files Browse the repository at this point in the history
* Creation of Exact Quantile Check

* Fix build issue

---------

Co-authored-by: ZDQ870 <[email protected]>
  • Loading branch information
jmilis2000 and ZDQ870 authored Oct 27, 2023
1 parent a529d4b commit cedcf07
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 2 deletions.
65 changes: 65 additions & 0 deletions src/main/scala/com/amazon/deequ/analyzers/ExactQuantile.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNumeric}
import com.amazon.deequ.analyzers.Analyzers.{conditionalSelection, ifNoNullsIn}
import com.amazon.deequ.metrics.FullColumn
import org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting
import org.apache.spark.sql.{Column, Row}
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.{DoubleType, StructType}

case class ExactQuantileState(exactQuantile: Double, quantile: Double, override val fullColumn: Option[Column] = None)
extends DoubleValuedState[ExactQuantileState] with FullColumn {
override def sum(other: ExactQuantileState): ExactQuantileState = {

ExactQuantileState(
expr(s"percentile($fullColumn, $quantile)").toString().toDouble,
quantile,
sum(fullColumn, other.fullColumn))
}

override def metricValue(): Double = {
exactQuantile
}
}

case class ExactQuantile(column: String,
quantile: Double,
where: Option[String] = None)
extends StandardScanShareableAnalyzer[ExactQuantileState]("ExactQuantile", column)
with FilterableAnalyzer {
override def aggregationFunctions(): Seq[Column] = {
expr(s"percentile(${conditionalSelection(column, where).cast(DoubleType)}, $quantile)") :: Nil
}

override def fromAggregationResult(result: Row, offset: Int): Option[ExactQuantileState] = {
ifNoNullsIn(result, offset) { _ =>
ExactQuantileState(result.getDouble(offset), quantile, Some(criterion))
}
}

override protected def additionalPreconditions(): Seq[StructType => Unit] = {
hasColumn(column) :: isNumeric(column) :: Nil
}

override def filterCondition: Option[String] = where

@VisibleForTesting
private def criterion: Column = conditionalSelection(column, where).cast(DoubleType)
}
5 changes: 5 additions & 0 deletions src/main/scala/com/amazon/deequ/analyzers/StateProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ case class HdfsStateProvider(
val serializedDigest = ApproximatePercentile.serializer.serialize(percentileDigest)
persistBytes(serializedDigest, identifier)

case _: ExactQuantile =>
persistDoubleState(state.asInstanceOf[ExactQuantileState].exactQuantile, identifier)

case _ =>
throw new IllegalArgumentException(s"Unable to persist state for analyzer $analyzer.")
}
Expand Down Expand Up @@ -177,6 +180,8 @@ case class HdfsStateProvider(
val percentileDigest = ApproximatePercentile.serializer.deserialize(loadBytes(identifier))
ApproxQuantileState(percentileDigest)

case _: ExactQuantile => ExactQuantile(identifier, loadDoubleState(identifier))

case _ =>
throw new IllegalArgumentException(s"Unable to load state for analyzer $analyzer.")
}
Expand Down
20 changes: 20 additions & 0 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,26 @@ case class Check(
approxQuantileConstraint(column, quantile, assertion, filter, hint))
}

/**
* Creates a constraint that asserts on an exact quantile
*
* @param column Column to run the assertion on
* @param quantile Which quantile to assert on
* @param assertion Function that receives a double input parameter (the computed quantile)
* and returns a boolean
* @param hint A hint to provide additional context why a constraint could have failed
* @return
*/
def hasExactQuantile(column: String,
quantile: Double,
assertion: Double => Boolean,
hint: Option[String] = None)
: CheckWithLastConstraintFilterable = {

addFilterableConstraint(filter =>
exactQuantileConstraint(column, quantile, assertion, filter, hint))
}

/**
* Creates a constraint that asserts on the minimum length of the column
*
Expand Down
30 changes: 30 additions & 0 deletions src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,36 @@ object Constraint {
new NamedConstraint(constraint, s"ApproxQuantileConstraint($approxQuantile)")
}

/**
* Runs exact quantile analysis on the given column and executes the assertion
*
* @param column Column to run the assertion on
* @param quantile Which quantile to assert on
* @param assertion Function that receives a double input parameter (the computed quantile)
* and returns a boolean
* @param where Additional filter to apply before the analyzer is run.
* @param hint A hint to provide additional context why a constraint could have failed
*/
def exactQuantileConstraint(
column: String,
quantile: Double,
assertion: Double => Boolean,
where: Option[String] = None,
hint: Option[String] = None)
: Constraint = {

val exactQuantile = ExactQuantile(column, quantile, where = where)

fromAnalyzer(exactQuantile, assertion, hint)
}

def fromAnalyzer(exactQuantile: ExactQuantile, assertion: Double => Boolean, hint: Option[String]): Constraint = {
val constraint = AnalysisBasedConstraint[ExactQuantileState, Double, Double](
exactQuantile, assertion, hint = hint)

new NamedConstraint(constraint, s"ExactQuantileConstraint($exactQuantile)")
}

/**
* Runs max length analysis on the given column and executes the assertion
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ private[deequ] object AnalyzerSerializer
result.addProperty("quantiles", approxQuantiles.quantiles.mkString(","))
result.addProperty("relativeError", approxQuantiles.relativeError)

case exactQuantile: ExactQuantile =>
result.addProperty(ANALYZER_NAME_FIELD, "ExactQuantile")
result.addProperty(COLUMN_FIELD, exactQuantile.column)
result.addProperty("quantile", exactQuantile.quantile)
result.addProperty(WHERE_FIELD, exactQuantile.where.orNull)


case minLength: MinLength =>
result.addProperty(ANALYZER_NAME_FIELD, "MinLength")
Expand Down Expand Up @@ -481,6 +487,11 @@ private[deequ] object AnalyzerDeserializer
val relativeError = json.get("relativeError").getAsDouble
ApproxQuantiles(column, quantile, relativeError)

case "ExactQuantile" =>
val column = json.get(COLUMN_FIELD).getAsString
val quantile = json.get("quantile").getAsDouble
ExactQuantile(column, quantile)

case "MinLength" =>
MinLength(
json.get(COLUMN_FIELD).getAsString,
Expand Down
16 changes: 15 additions & 1 deletion src/test/scala/com/amazon/deequ/checks/CheckTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val numericAnalysis = AnalysisRunner.onData(dfNumeric).addAnalyzers(Seq(
Minimum("att1"), Maximum("att1"), Mean("att1"), Sum("att1"),
StandardDeviation("att1"), ApproxCountDistinct("att1"),
ApproxQuantile("att1", quantile = 0.5)))
ApproxQuantile("att1", quantile = 0.5), ExactQuantile("att1", quantile = 0.5)))

val contextNumeric = numericAnalysis.run()

Expand All @@ -594,6 +594,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
assertSuccess(baseCheck.hasStandardDeviation("att1", _ == 1.707825127659933), contextNumeric)
assertSuccess(baseCheck.hasApproxCountDistinct("att1", _ == 6.0), contextNumeric)
assertSuccess(baseCheck.hasApproxQuantile("att1", quantile = 0.5, _ == 3.0), contextNumeric)
assertSuccess(baseCheck.hasExactQuantile("att1", quantile = 0.5, _ == 3.5), contextNumeric)

val correlationAnalysisInformative = AnalysisRunner.onData(dfInformative)
.addAnalyzer(Correlation("att1", "att2"))
Expand Down Expand Up @@ -634,6 +635,19 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
assertSuccess(hasApproxQuantileCheckWithFilter, context)
}

"correctly evaluate hasExactQuantile constraints" in withSparkSession { sparkSession =>
val hasExactQuantileCheck = Check(CheckLevel.Error, "a")
.hasExactQuantile("att1", quantile = 0.5, _ == 3.5)
val hasExactQuantileCheckWithFilter = Check(CheckLevel.Error, "a")
.hasExactQuantile("att1", quantile = 0.5, _ == 5.0).where("att2 > 0")

val context = runChecks(getDfWithNumericValues(sparkSession), hasExactQuantileCheck,
hasExactQuantileCheckWithFilter)

assertSuccess(hasExactQuantileCheck, context)
assertSuccess(hasExactQuantileCheckWithFilter, context)
}

"yield correct results for minimum and maximum length stats" in
withSparkSession { sparkSession =>
val baseCheck = Check(CheckLevel.Error, description = "a description")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class AnalysisResultSerdeTest extends FlatSpec with Matchers {
MinLength("ColumnA") ->
DoubleMetric(Entity.Column, "MinLength", "ColumnA", Success(5.0)),
MaxLength("ColumnA") ->
DoubleMetric(Entity.Column, "MaxLength", "ColumnA", Success(5.0))
DoubleMetric(Entity.Column, "MaxLength", "ColumnA", Success(5.0)),
ExactQuantile("ColumnA", 0.5) ->
DoubleMetric(Entity.Column, "Completeness", "ColumnA", Success(5.0))
))

val dateTime = LocalDate.of(2017, 10, 14).atTime(10, 10, 10)
Expand Down Expand Up @@ -173,6 +175,16 @@ class AnalysisResultSerdeTest extends FlatSpec with Matchers {
assertCorrectlyConvertsAnalysisResults(Seq(result))
}

"serialization of ExactQuantile" should "correctly restore it" in {

val analyzer = ExactQuantile("col", 0.5)
val metric = DoubleMetric(Entity.Column, "ExactQuantile", "col", Success(0.5))
val context = AnalyzerContext(Map(analyzer -> metric))
val result = new AnalysisResult(ResultKey(0), context)

assertCorrectlyConvertsAnalysisResults(Seq(result))
}

val histogramSumJson =
"""[
| {
Expand Down

0 comments on commit cedcf07

Please sign in to comment.