From 525af319e775941e7f2adb21fdd467191fa2a98d Mon Sep 17 00:00:00 2001 From: Scott Gunn Date: Tue, 26 Mar 2024 13:46:48 -0400 Subject: [PATCH 1/5] Added RatioOfSums analyzer and tests --- .../amazon/deequ/analyzers/RatioOfSums.scala | 81 +++++++++++++++++++ .../repository/AnalysisResultSerde.scala | 12 +++ .../deequ/analyzers/AnalyzerTests.scala | 11 +++ .../repository/AnalysisResultSerdeTest.scala | 2 + 4 files changed, 106 insertions(+) create mode 100644 src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala diff --git a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala new file mode 100644 index 00000000..d80aee66 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala @@ -0,0 +1,81 @@ +package com.amazon.deequ.analyzers + +import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNumeric} +import com.amazon.deequ.metrics.Entity +import org.apache.spark.sql.DeequFunctions.stateful_corr +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StructType +import Analyzers._ + +import com.amazon.deequ.metrics.Entity +import com.amazon.deequ.repository.AnalysisResultSerde + +case class RatioOfSumsState( + numerator: Double, + denominator: Double +) extends DoubleValuedState[RatioOfSumsState] { + + override def sum(other: RatioOfSumsState): RatioOfSumsState = { + val n1 = numerator + val n2 = other.numerator + val newN = n1 + n2 + val t1 = denominator + val t2 = other.denominator + val newD = t1 + t2 + + RatioOfSumsState(newN, newD) + } + + override def metricValue(): Double = { + numerator / denominator + } +} + +/** Sums up 2 columns and then divides the final values + * + * @param numerator + * First input column for computation + * @param denominator + * Second input column for computation + */ +case class RatioOfSums( + numerator: String, + denominator: String, + where: Option[String] = None +) extends StandardScanShareableAnalyzer[RatioOfSumsState]( + "RatioOfSums", + s"$numerator,$denominator", + Entity.Multicolumn + ) + with FilterableAnalyzer { + + override def aggregationFunctions(): Seq[Column] = { + val firstSelection = conditionalSelection(numerator, where) + val secondSelection = conditionalSelection(denominator, where) + sum(firstSelection).cast(DoubleType) :: sum(secondSelection).cast(DoubleType) :: Nil + } + + override def fromAggregationResult( + result: Row, + offset: Int + ): Option[RatioOfSumsState] = { + if (result.isNullAt(offset)) { + None + } else { + Some( + RatioOfSumsState( + result.getDouble(0), + result.getDouble(1) + ) + ) + } + } + + override protected def additionalPreconditions(): Seq[StructType => Unit] = { + hasColumn(numerator) :: isNumeric(numerator) :: hasColumn(denominator) :: isNumeric(denominator) :: Nil + } + + override def filterCondition: Option[String] = where +} diff --git a/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala b/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala index e9bb4f7d..eb0db536 100644 --- a/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala +++ b/src/main/scala/com/amazon/deequ/repository/AnalysisResultSerde.scala @@ -256,6 +256,12 @@ private[deequ] object AnalyzerSerializer result.addProperty(COLUMN_FIELD, sum.column) result.addProperty(WHERE_FIELD, sum.where.orNull) + case ratioOfSums: RatioOfSums => + result.addProperty(ANALYZER_NAME_FIELD, "RatioOfSums") + result.addProperty("numerator", ratioOfSums.numerator) + result.addProperty("denominator", ratioOfSums.denominator) + result.addProperty(WHERE_FIELD, ratioOfSums.where.orNull) + case mean: Mean => result.addProperty(ANALYZER_NAME_FIELD, "Mean") result.addProperty(COLUMN_FIELD, mean.column) @@ -412,6 +418,12 @@ private[deequ] object AnalyzerDeserializer json.get(COLUMN_FIELD).getAsString, getOptionalWhereParam(json)) + case "RatioOfSums" => + RatioOfSums( + json.get("numerator").getAsString, + json.get("denominator").getAsString, + getOptionalWhereParam(json)) + case "Mean" => Mean( json.get(COLUMN_FIELD).getAsString, diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala index 1c0b28d1..c24fb08f 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala @@ -847,6 +847,17 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with analyzer.calculate(df).value shouldBe Success(2.0 / 8.0) assert(analyzer.calculate(df).fullColumn.isDefined) } + + "compute ratio of sums correctly for numeric data" in withSparkSession { session => + val df = getDfWithNumericValues(session) + RatioOfSums("att1", "att2").calculate(df).value shouldBe Success(21.0 / 18.0) + } + + "fail to compute ratio of sums for non numeric type" in withSparkSession { sparkSession => + val df = getDfFull(sparkSession) + assert(RatioOfSums("att1", "att2").calculate(df).value.isFailure) + } + } } diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala index 05f4d47b..1000ff8e 100644 --- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala @@ -76,6 +76,8 @@ class AnalysisResultSerdeTest extends FlatSpec with Matchers { DoubleMetric(Entity.Column, "Completeness", "ColumnA", Success(5.0)), Sum("ColumnA") -> DoubleMetric(Entity.Column, "Completeness", "ColumnA", Success(5.0)), + RatioOfSums("ColumnA", "ColumnB") -> + DoubleMetric(Entity.Column, "RatioOfSums", "ColumnA", Success(5.0)), StandardDeviation("ColumnA") -> DoubleMetric(Entity.Column, "Completeness", "ColumnA", Success(5.0)), DataType("ColumnA") -> From 7bf0eeb066e182f9a516a2689efb282d53bc9998 Mon Sep 17 00:00:00 2001 From: Scott Gunn Date: Mon, 1 Apr 2024 14:15:30 -0400 Subject: [PATCH 2/5] Unit test for divide by zero and code cleanup. --- .../scala/com/amazon/deequ/analyzers/RatioOfSums.scala | 9 +-------- .../com/amazon/deequ/analyzers/AnalyzerTests.scala | 10 ++++++++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala index d80aee66..e9af4f8c 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala @@ -18,14 +18,7 @@ case class RatioOfSumsState( ) extends DoubleValuedState[RatioOfSumsState] { override def sum(other: RatioOfSumsState): RatioOfSumsState = { - val n1 = numerator - val n2 = other.numerator - val newN = n1 + n2 - val t1 = denominator - val t2 = other.denominator - val newD = t1 + t2 - - RatioOfSumsState(newN, newD) + RatioOfSumsState(numerator + other.numerator, denominator + other.denominator) } override def metricValue(): Double = { diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala index c24fb08f..be5bdc5a 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala @@ -848,8 +848,8 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with assert(analyzer.calculate(df).fullColumn.isDefined) } - "compute ratio of sums correctly for numeric data" in withSparkSession { session => - val df = getDfWithNumericValues(session) + "compute ratio of sums correctly for numeric data" in withSparkSession { sparkSession => + val df = getDfWithNumericValues(sparkSession) RatioOfSums("att1", "att2").calculate(df).value shouldBe Success(21.0 / 18.0) } @@ -858,6 +858,12 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with assert(RatioOfSums("att1", "att2").calculate(df).value.isFailure) } + "divide by zero" in withSparkSession { sparkSession => + val df = getDfWithNumericValues(sparkSession) + val testVal = RatioOfSums("att1", "att2", Some("item IN ('1', '2')")).calculate(df) + assert(testVal.value.isSuccess) + assert(testVal.value.toOption.get.isInfinite) + } } } From c18e349af70f4d203e8d64d528a683529dac8c7e Mon Sep 17 00:00:00 2001 From: Scott Gunn Date: Mon, 1 Apr 2024 14:27:28 -0400 Subject: [PATCH 3/5] More detailed Scaladoc --- .../com/amazon/deequ/analyzers/RatioOfSums.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala index e9af4f8c..3209be8f 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala @@ -26,13 +26,15 @@ case class RatioOfSumsState( } } -/** Sums up 2 columns and then divides the final values - * - * @param numerator - * First input column for computation - * @param denominator - * Second input column for computation - */ +/** Sums up 2 columns and then divides the final values as a Double. The columns + * can contain a mix of positive and negative numbers. Dividing by zero is allowed + * and will result in a value of Double.PositiveInfinity. + * + * @param numerator + * First input column for computation + * @param denominator + * Second input column for computation + */ case class RatioOfSums( numerator: String, denominator: String, From f76ce89e8bebf1bb8aa7116182df4de92934d740 Mon Sep 17 00:00:00 2001 From: Scott Gunn Date: Mon, 1 Apr 2024 14:41:00 -0400 Subject: [PATCH 4/5] Fixed docs to include Double.NegativeInfinity --- src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala index 3209be8f..7c6add79 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala @@ -28,7 +28,7 @@ case class RatioOfSumsState( /** Sums up 2 columns and then divides the final values as a Double. The columns * can contain a mix of positive and negative numbers. Dividing by zero is allowed - * and will result in a value of Double.PositiveInfinity. + * and will result in a value of Double.PositiveInfinity or Double.NegativeInfinity. * * @param numerator * First input column for computation From 39b0fac20703b1fee50b134294a2ba06a081c5d2 Mon Sep 17 00:00:00 2001 From: Scott Gunn Date: Wed, 10 Apr 2024 17:59:32 -0400 Subject: [PATCH 5/5] Add copyright to new file --- .../com/amazon/deequ/analyzers/RatioOfSums.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala index 7c6add79..593d358d 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/RatioOfSums.scala @@ -1,3 +1,19 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNumeric}