From 3a571721d45c89d3bddff6a177ee33b937cc30d6 Mon Sep 17 00:00:00 2001 From: rdsharma26 <65777064+rdsharma26@users.noreply.github.com> Date: Fri, 27 Oct 2023 13:28:47 -0400 Subject: [PATCH] Verify that non key columns exist in each dataset (#517) * Verify that non key columns exist in each dataset - The non existence of non key columns was resulting in a Spark SQL exception, instead of a graceful "ComparisonFailed" return value. - In a future PR, will consolidate all the column validation logic into one single place. * Fix failing build due to formatting issues. --- .../comparison/DataSynchronization.scala | 92 ++++++++++---- .../comparison/DataSynchronizationTest.scala | 117 ++++++++++++++++++ 2 files changed, 182 insertions(+), 27 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala index 6b12339d..c5a82f76 100644 --- a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala +++ b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala @@ -98,7 +98,7 @@ object DataSynchronization extends ComparisonBase { if (columnErrors.isEmpty) { // Get all the non-key columns from DS1 and verify that they are present in DS2 val colsDS1 = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted - val nonKeyColsMatch = colsDS1.forall { col => Try { ds2(col) }.isSuccess } + val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { ComparisonFailed("Non key columns in the given data frames do not match.") @@ -131,12 +131,23 @@ object DataSynchronization extends ComparisonBase { colKeyMap: Map[String, String], compCols: Map[String, String], assertion: Double => Boolean): ComparisonResult = { - val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap) - if (columnErrors.isEmpty) { - val mergedMaps = colKeyMap ++ compCols - finalAssertion(ds1, ds2, mergedMaps, assertion) + val keyColumnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap) + if (keyColumnErrors.isEmpty) { + val nonKeyColumns1NotInDataset = compCols.keys.filterNot(columnExists(ds1, _)) + val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _)) + + if (nonKeyColumns1NotInDataset.nonEmpty) { + ComparisonFailed(s"The following columns were not found in the first dataset: " + + s"${nonKeyColumns1NotInDataset.mkString(", ")}") + } else if (nonKeyColumns2NotInDataset.nonEmpty) { + ComparisonFailed(s"The following columns were not found in the second dataset: " + + s"${nonKeyColumns2NotInDataset.mkString(", ")}") + } else { + val mergedMaps = colKeyMap ++ compCols + finalAssertion(ds1, ds2, mergedMaps, assertion) + } } else { - ComparisonFailed(columnErrors.get) + ComparisonFailed(keyColumnErrors.get) } } @@ -150,12 +161,27 @@ object DataSynchronization extends ComparisonBase { val compColsEither: Either[ComparisonFailed, Map[String, String]] = if (optionalCompCols.isDefined) { optionalCompCols.get match { case compCols if compCols.isEmpty => Left(ComparisonFailed("Empty column comparison map provided.")) - case compCols => Right(compCols) + case compCols => + val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _)) + val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _)) + if (ds1CompColsNotInDataset.nonEmpty) { + Left( + ComparisonFailed(s"The following columns were not found in the first dataset: " + + s"${ds1CompColsNotInDataset.mkString(", ")}") + ) + } else if (ds2CompColsNotInDataset.nonEmpty) { + Left( + ComparisonFailed(s"The following columns were not found in the second dataset: " + + s"${ds2CompColsNotInDataset.mkString(", ")}") + ) + } else { + Right(compCols) + } } } else { // Get all the non-key columns from DS1 and verify that they are present in DS2 val ds1NonKeyCols = ds1.columns.filterNot(x => colKeyMap.keys.toSeq.contains(x)).sorted - val nonKeyColsMatch = ds1NonKeyCols.forall { col => Try { ds2(col) }.isSuccess } + val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { Left(ComparisonFailed("Non key columns in the given data frames do not match.")) @@ -181,30 +207,40 @@ object DataSynchronization extends ComparisonBase { private def areKeyColumnsValid(ds1: DataFrame, ds2: DataFrame, colKeyMap: Map[String, String]): Option[String] = { - // We verify that the key columns provided form a valid primary/composite key. - // To achieve this, we group the dataframes and compare their count with the original count. - // If the key columns provided are valid, then the two counts should match. val ds1Cols = colKeyMap.keys.toSeq val ds2Cols = colKeyMap.values.toSeq - val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count() - val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count() - val ds1Count = ds1.count() - val ds2Count = ds2.count() - val ds1UniqueCount = ds1Unique.count() - val ds2UniqueCount = ds2Unique.count() + val ds1ColsNotInDataset = ds1Cols.filterNot(columnExists(ds1, _)) + val ds2ColsNotInDataset = ds2Cols.filterNot(columnExists(ds2, _)) - if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) { - None + if (ds1ColsNotInDataset.nonEmpty) { + Some(s"The following key columns were not found in the first dataset: ${ds1ColsNotInDataset.mkString(", ")}") + } else if (ds2ColsNotInDataset.nonEmpty) { + Some(s"The following key columns were not found in the second dataset: ${ds2ColsNotInDataset.mkString(", ")}") } else { - val combo1 = ds1Cols.mkString(", ") - val combo2 = ds2Cols.mkString(", ") - Some(s"The selected columns are not comparable due to duplicates present in the dataset." + - s"Comparison keys must be unique, but " + - s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," + - s" and " + - s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " + - s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2") + // We verify that the key columns provided form a valid primary/composite key. + // To achieve this, we group the dataframes and compare their count with the original count. + // If the key columns provided are valid, then the two counts should match. + val ds1Unique = ds1.groupBy(ds1Cols.map(col): _*).count() + val ds2Unique = ds2.groupBy(ds2Cols.map(col): _*).count() + + val ds1Count = ds1.count() + val ds2Count = ds2.count() + val ds1UniqueCount = ds1Unique.count() + val ds2UniqueCount = ds2Unique.count() + + if (ds1UniqueCount == ds1Count && ds2UniqueCount == ds2Count) { + None + } else { + val combo1 = ds1Cols.mkString(", ") + val combo2 = ds2Cols.mkString(", ") + Some(s"The selected columns are not comparable due to duplicates present in the dataset." + + s"Comparison keys must be unique, but " + + s"in Dataframe 1, there are $ds1UniqueCount unique records and $ds1Count rows," + + s" and " + + s"in Dataframe 2, there are $ds2UniqueCount unique records and $ds2Count rows, " + + s"based on the combination of keys {$combo1} in Dataframe 1 and {$combo2} in Dataframe 2") + } } } @@ -291,4 +327,6 @@ object DataSynchronization extends ComparisonBase { .drop(ds2HashColName) .drop(ds2KeyColsUpdatedNamesMap.values.toSeq: _*) } + + private def columnExists(df: DataFrame, col: String) = Try { df(col) }.isSuccess } diff --git a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala index 376a338b..932b3cba 100644 --- a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala +++ b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala @@ -686,5 +686,122 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val expected = Map(1 -> true, 2 -> true, 3 -> true, 4 -> true, 5 -> false, 6 -> false) assert(expected == rowLevelResults) } + + "fails as expected when key columns do not exist" in withSparkSession { spark => + val idColumnName = "id" + val ds1 = primaryDataset(spark, idColumnName) + val ds2 = referenceDataset(spark, idColumnName) + val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match + + val nonExistCol1 = "foo" + val nonExistCol2 = "bar" + + // Key columns not in either dataset (Overall) + val colKeyMap1 = Map(nonExistCol1 -> nonExistCol2) + val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap1, assertion) + + assert(overallResult1.isInstanceOf[ComparisonFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed] + assert(failedOverallResult1.errorMessage.contains("key columns were not found in the first dataset")) + assert(failedOverallResult1.errorMessage.contains(nonExistCol1)) + + // Key columns not in either dataset (Row level) + val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap1) + assert(rowLevelResult1.isLeft) + val failedRowLevelResult1 = rowLevelResult1.left.get + assert(failedRowLevelResult1.errorMessage.contains("key columns were not found in the first dataset")) + assert(failedRowLevelResult1.errorMessage.contains(nonExistCol1)) + + // Key column not in first dataset + val colKeyMap2 = Map(nonExistCol1 -> idColumnName) + val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap2, assertion) + + assert(overallResult2.isInstanceOf[ComparisonFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed] + assert(failedOverallResult2.errorMessage.contains("key columns were not found in the first dataset")) + assert(failedOverallResult2.errorMessage.contains(nonExistCol1)) + + // Key column not in first dataset (Row level) + val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap2) + assert(rowLevelResult2.isLeft) + val failedRowLevelResult2 = rowLevelResult2.left.get + assert(failedRowLevelResult2.errorMessage.contains("key columns were not found in the first dataset")) + assert(failedRowLevelResult2.errorMessage.contains(nonExistCol1)) + + // Key column not in second dataset + val colKeyMap3 = Map(idColumnName -> nonExistCol2) + val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap3, assertion) + + assert(overallResult3.isInstanceOf[ComparisonFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed] + assert(failedOverallResult3.errorMessage.contains("key columns were not found in the second dataset")) + assert(failedOverallResult3.errorMessage.contains(nonExistCol2)) + + // Key column not in second dataset (Row level) + val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap3) + assert(rowLevelResult3.isLeft) + val failedRowLevelResult3 = rowLevelResult3.left.get + assert(failedRowLevelResult3.errorMessage.contains("key columns were not found in the second dataset")) + assert(failedRowLevelResult3.errorMessage.contains(nonExistCol2)) + } + + "fails as expected when non-key columns do not exist" in withSparkSession { spark => + val idColumnName = "id" + val ds1 = primaryDataset(spark, idColumnName) + val ds2 = referenceDataset(spark, idColumnName) + val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match + val colKeyMap = Map(idColumnName -> idColumnName) + + val nonExistCol1 = "foo" + val nonExistCol2 = "bar" + + // Non-key columns not in either dataset (Overall) + val compColsMap1 = Map(nonExistCol1 -> nonExistCol2) + val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap1, assertion) + + assert(overallResult1.isInstanceOf[ComparisonFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[ComparisonFailed] + assert(failedOverallResult1.errorMessage.contains( + s"The following columns were not found in the first dataset: $nonExistCol1")) + + // Non-key columns not in either dataset (Row level) + val rowLevelResult1 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap1)) + assert(rowLevelResult1.isLeft) + val failedRowLevelResult1 = rowLevelResult1.left.get + assert(failedRowLevelResult1.errorMessage.contains( + s"The following columns were not found in the first dataset: $nonExistCol1")) + + // Non-key column not in first dataset + val compColsMap2 = Map(nonExistCol1 -> "State") + val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap2, assertion) + + assert(overallResult2.isInstanceOf[ComparisonFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[ComparisonFailed] + assert(failedOverallResult2.errorMessage.contains( + s"The following columns were not found in the first dataset: $nonExistCol1")) + + // Non-key columns not in first dataset (Row level) + val rowLevelResult2 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap2)) + assert(rowLevelResult2.isLeft) + val failedRowLevelResult2 = rowLevelResult2.left.get + assert(failedRowLevelResult2.errorMessage.contains( + s"The following columns were not found in the first dataset: $nonExistCol1")) + + // Non-key column not in second dataset + val compColsMap3 = Map("state" -> nonExistCol2) + val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap3, assertion) + + assert(overallResult3.isInstanceOf[ComparisonFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[ComparisonFailed] + assert(failedOverallResult3.errorMessage.contains( + s"The following columns were not found in the second dataset: $nonExistCol2")) + + // Non-key column not in second dataset (Row level) + val rowLevelResult3 = DataSynchronization.columnMatchRowLevel(ds1, ds2, colKeyMap, Some(compColsMap3)) + assert(rowLevelResult3.isLeft) + val failedRowLevelResult3 = rowLevelResult3.left.get + assert(failedOverallResult3.errorMessage.contains( + s"The following columns were not found in the second dataset: $nonExistCol2")) + } } }