From 7c76c595c82a1775c642cee023d6761067018b04 Mon Sep 17 00:00:00 2001 From: Edward Cho <114528615+eycho-am@users.noreply.github.com> Date: Wed, 9 Oct 2024 15:40:30 -0400 Subject: [PATCH] Add commits from master branch to release/2.0.8-spark-3.5 (#587) * Replace 'withColumns' with 'select' (#582) 'withColumns' was introduced in Spark 3.3, so it won't work for Deequ's <3.3 builds. * Replace rdd with dataframe functions in Histogram analyzer (#586) Co-authored-by: Shriya Vanvari * Updated version in pom.xml to 2.0.8-spark-3.5 (#578) Co-authored-by: Yannis Mentekidis --------- Co-authored-by: Josh <5685731+marcantony@users.noreply.github.com> Co-authored-by: Shriya Vanvari Co-authored-by: Shriya Vanvari Co-authored-by: Yannis Mentekidis Co-authored-by: Yannis Mentekidis --- .../com/amazon/deequ/VerificationResult.scala | 5 +++-- .../amazon/deequ/analyzers/Histogram.scala | 21 ++++++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/VerificationResult.scala b/src/main/scala/com/amazon/deequ/VerificationResult.scala index 418a622e..b9b450f2 100644 --- a/src/main/scala/com/amazon/deequ/VerificationResult.scala +++ b/src/main/scala/com/amazon/deequ/VerificationResult.scala @@ -31,7 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.functions.{col, monotonically_increasing_id} import java.util.UUID @@ -96,9 +96,10 @@ object VerificationResult { data: DataFrame): DataFrame = { val columnNamesToMetrics: Map[String, Column] = verificationResultToColumn(verificationResult) + val columnsAliased = columnNamesToMetrics.toSeq.map { case (name, col) => col.as(name) } val dataWithID = data.withColumn(UNIQUENESS_ID, monotonically_increasing_id()) - dataWithID.withColumns(columnNamesToMetrics).drop(UNIQUENESS_ID) + dataWithID.select(col("*") +: columnsAliased: _*).drop(UNIQUENESS_ID) } def checkResultsAsJson(verificationResult: VerificationResult, diff --git a/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala b/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala index 742b2ba6..fbdb5b2b 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala @@ -84,11 +84,26 @@ case class Histogram( case Some(theState) => val value: Try[Distribution] = Try { - val topNRows = theState.frequencies.rdd.top(maxDetailBins)(OrderByAbsoluteCount) + val countColumnName = theState.frequencies.schema.fields + .find(field => field.dataType == LongType && field.name != column) + .map(_.name) + .getOrElse(throw new IllegalStateException(s"Count column not found in the frequencies DataFrame")) + + val topNRowsDF = theState.frequencies + .orderBy(col(countColumnName).desc) + .limit(maxDetailBins) + .collect() + val binCount = theState.frequencies.count() - val histogramDetails = topNRows - .map { case Row(discreteValue: String, absolute: Long) => + val columnName = theState.frequencies.columns + .find(_ == column) + .getOrElse(throw new IllegalStateException(s"Column $column not found")) + + val histogramDetails = topNRowsDF + .map { row => + val discreteValue = row.getAs[String](columnName) + val absolute = row.getAs[Long](countColumnName) val ratio = absolute.toDouble / theState.numRows discreteValue -> DistributionValue(absolute, ratio) }