Skip to content

Commit

Permalink
Add commits from master branch to release/2.0.8-spark-3.5 (#587)
Browse files Browse the repository at this point in the history
* Replace 'withColumns' with 'select' (#582)

'withColumns' was introduced in Spark 3.3, so it won't
work for Deequ's <3.3 builds.

* Replace rdd with dataframe functions in Histogram analyzer (#586)

Co-authored-by: Shriya Vanvari <[email protected]>

* Updated version in pom.xml to 2.0.8-spark-3.5 (#578)

Co-authored-by: Yannis Mentekidis <[email protected]>

---------

Co-authored-by: Josh <[email protected]>
Co-authored-by: Shriya Vanvari <[email protected]>
Co-authored-by: Shriya Vanvari <[email protected]>
Co-authored-by: Yannis Mentekidis <[email protected]>
Co-authored-by: Yannis Mentekidis <[email protected]>
  • Loading branch information
6 people authored Oct 9, 2024
1 parent ff3cb69 commit 7c76c59
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/com/amazon/deequ/VerificationResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.functions.{col, monotonically_increasing_id}

import java.util.UUID

Expand Down Expand Up @@ -96,9 +96,10 @@ object VerificationResult {
data: DataFrame): DataFrame = {

val columnNamesToMetrics: Map[String, Column] = verificationResultToColumn(verificationResult)
val columnsAliased = columnNamesToMetrics.toSeq.map { case (name, col) => col.as(name) }

val dataWithID = data.withColumn(UNIQUENESS_ID, monotonically_increasing_id())
dataWithID.withColumns(columnNamesToMetrics).drop(UNIQUENESS_ID)
dataWithID.select(col("*") +: columnsAliased: _*).drop(UNIQUENESS_ID)
}

def checkResultsAsJson(verificationResult: VerificationResult,
Expand Down
21 changes: 18 additions & 3 deletions src/main/scala/com/amazon/deequ/analyzers/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,26 @@ case class Histogram(
case Some(theState) =>
val value: Try[Distribution] = Try {

val topNRows = theState.frequencies.rdd.top(maxDetailBins)(OrderByAbsoluteCount)
val countColumnName = theState.frequencies.schema.fields
.find(field => field.dataType == LongType && field.name != column)
.map(_.name)
.getOrElse(throw new IllegalStateException(s"Count column not found in the frequencies DataFrame"))

val topNRowsDF = theState.frequencies
.orderBy(col(countColumnName).desc)
.limit(maxDetailBins)
.collect()

val binCount = theState.frequencies.count()

val histogramDetails = topNRows
.map { case Row(discreteValue: String, absolute: Long) =>
val columnName = theState.frequencies.columns
.find(_ == column)
.getOrElse(throw new IllegalStateException(s"Column $column not found"))

val histogramDetails = topNRowsDF
.map { row =>
val discreteValue = row.getAs[String](columnName)
val absolute = row.getAs[Long](countColumnName)
val ratio = absolute.toDouble / theState.numRows
discreteValue -> DistributionValue(absolute, ratio)
}
Expand Down

0 comments on commit 7c76c59

Please sign in to comment.