From fc0ffb897aa489ea506e2679518bb2401386ef62 Mon Sep 17 00:00:00 2001 From: Isha Tarte Date: Thu, 5 Dec 2024 11:38:14 -0800 Subject: [PATCH 1/4] Integration tests with require_partition_filter = true for DPO time partitioned tables --- .../integration/WriteIntegrationTestBase.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index 83dcfa433..f4d39daf9 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -1591,6 +1591,11 @@ protected Dataset writeAndLoadDatasetOverwriteDynamicPartition(Dataset .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .save(); + IntegrationTestUtils.runQuery( + String.format( + "ALTER TABLE %s.%s SET OPTIONS (require_partition_filter = false)", + testDataset, testTable)); + return spark .read() .format("bigquery") @@ -1607,9 +1612,9 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, HOUR) " + + "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " - + "(2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')])", + + "(2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')]) ", testDataset, testTable, orderId, orderDateTime)); Dataset df = @@ -1651,7 +1656,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY DATE(order_date_time) " + + "PARTITION BY DATE(order_date_time) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-09-29 10:00:00 UTC'), (3, TIMESTAMP '2023-09-29 17:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1695,7 +1700,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, MONTH) " + + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (3, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1739,7 +1744,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, YEAR) " + + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2022-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (2, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1783,7 +1788,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY order_date " + + "PARTITION BY order_date OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), (2, DATE('2023-09-29'))])", testDataset, testTable, orderId, orderDate)); @@ -1823,7 +1828,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY DATE_TRUNC(order_date, MONTH) " + + "PARTITION BY DATE_TRUNC(order_date, MONTH) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), " + "(2, DATE('2023-10-29')), (2, DATE('2023-10-28'))])", testDataset, testTable, orderId, orderDate)); @@ -1864,7 +1869,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY DATE_TRUNC(order_date, YEAR) " + + "PARTITION BY DATE_TRUNC(order_date, YEAR) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, DATE('2022-09-28')), " + "(2, DATE('2023-10-29')), (2, DATE('2023-11-28'))])", testDataset, testTable, orderId, orderDate)); @@ -1905,7 +1910,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); IntegrationTestUtils.runQuery( String.format( - "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " + "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) OPTIONS (require_partition_filter = true)" + "PARTITION BY timestamp_trunc(order_date_time, HOUR) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-09-28 10:00:00'), (3, DATETIME '2023-09-28 10:30:00')])", @@ -1948,7 +1953,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, DAY) " + + "PARTITION BY timestamp_trunc(order_date_time, DAY) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-09-29 10:00:00'), (3, DATETIME '2023-09-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -1990,7 +1995,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, MONTH) " + + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-10-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -2032,7 +2037,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, YEAR) " + + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, DATETIME '2022-09-28 1:00:00'), " + "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-11-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -2111,7 +2116,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) " - + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) " + + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10))" + "AS SELECT * FROM UNNEST([(1, 1000), " + "(8, 1005), ( 21, 1010), (83, 1020)])", testDataset, testTable, orderId, orderCount)); From 356c1ad23221f3829237ce492a42caa1df56e6f0 Mon Sep 17 00:00:00 2001 From: Isha Tarte Date: Thu, 5 Dec 2024 13:43:23 -0800 Subject: [PATCH 2/4] spacing fix --- .../integration/WriteIntegrationTestBase.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index f4d39daf9..79290091b 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -1612,7 +1612,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true)" + + "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')]) ", testDataset, testTable, orderId, orderDateTime)); @@ -1656,7 +1656,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY DATE(order_date_time) OPTIONS (require_partition_filter = true)" + + "PARTITION BY DATE(order_date_time) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-09-29 10:00:00 UTC'), (3, TIMESTAMP '2023-09-29 17:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1700,7 +1700,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true)" + + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (3, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1744,7 +1744,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true)" + + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2022-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (2, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1788,7 +1788,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY order_date OPTIONS (require_partition_filter = true)" + + "PARTITION BY order_date OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), (2, DATE('2023-09-29'))])", testDataset, testTable, orderId, orderDate)); @@ -1828,7 +1828,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY DATE_TRUNC(order_date, MONTH) OPTIONS (require_partition_filter = true)" + + "PARTITION BY DATE_TRUNC(order_date, MONTH) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), " + "(2, DATE('2023-10-29')), (2, DATE('2023-10-28'))])", testDataset, testTable, orderId, orderDate)); @@ -1869,7 +1869,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY DATE_TRUNC(order_date, YEAR) OPTIONS (require_partition_filter = true)" + + "PARTITION BY DATE_TRUNC(order_date, YEAR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATE('2022-09-28')), " + "(2, DATE('2023-10-29')), (2, DATE('2023-11-28'))])", testDataset, testTable, orderId, orderDate)); @@ -1910,7 +1910,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); IntegrationTestUtils.runQuery( String.format( - "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) OPTIONS (require_partition_filter = true)" + "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) OPTIONS (require_partition_filter = true) " + "PARTITION BY timestamp_trunc(order_date_time, HOUR) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-09-28 10:00:00'), (3, DATETIME '2023-09-28 10:30:00')])", @@ -1953,7 +1953,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, DAY) OPTIONS (require_partition_filter = true)" + + "PARTITION BY timestamp_trunc(order_date_time, DAY) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-09-29 10:00:00'), (3, DATETIME '2023-09-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -1995,7 +1995,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true)" + + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-10-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -2037,7 +2037,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true)" + + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2022-09-28 1:00:00'), " + "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-11-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -2116,7 +2116,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) " - + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10))" + + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) " + "AS SELECT * FROM UNNEST([(1, 1000), " + "(8, 1005), ( 21, 1010), (83, 1020)])", testDataset, testTable, orderId, orderCount)); From bf1700572c6e0053b36def8475172d7abc2260d3 Mon Sep 17 00:00:00 2001 From: Isha Tarte Date: Thu, 5 Dec 2024 15:51:41 -0800 Subject: [PATCH 3/4] fix no partition test --- .../integration/WriteIntegrationTestBase.java | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index 79290091b..7e1c3d645 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -1579,7 +1579,8 @@ public void testWriteToTimestampField() { assertThat(head.get(head.fieldIndex("timestamp1"))).isEqualTo(timestamp1); } - protected Dataset writeAndLoadDatasetOverwriteDynamicPartition(Dataset df) { + protected Dataset writeAndLoadDatasetOverwriteDynamicPartition( + Dataset df, boolean isPartitioned) { df.write() .format("bigquery") .mode(SaveMode.Overwrite) @@ -1591,10 +1592,12 @@ protected Dataset writeAndLoadDatasetOverwriteDynamicPartition(Dataset .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .save(); - IntegrationTestUtils.runQuery( - String.format( - "ALTER TABLE %s.%s SET OPTIONS (require_partition_filter = false)", - testDataset, testTable)); + if (isPartitioned) { + IntegrationTestUtils.runQuery( + String.format( + "ALTER TABLE %s.%s SET OPTIONS (require_partition_filter = false)", + testDataset, testTable)); + } return spark .read() @@ -1626,7 +1629,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1670,7 +1673,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1714,7 +1717,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1758,7 +1761,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1801,7 +1804,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1842,7 +1845,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1883,7 +1886,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1925,7 +1928,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1967,7 +1970,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2009,7 +2012,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2051,7 +2054,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2091,7 +2094,7 @@ public void testOverwriteDynamicPartition_noTimePartitioning() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, false); assertThat(result.count()).isEqualTo(2); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2132,7 +2135,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(5); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2179,7 +2182,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanSt StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(2); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2214,7 +2217,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterTha StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(2); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2249,7 +2252,7 @@ public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2288,7 +2291,7 @@ public void testOverwriteDynamicPartition_rangePartitionedWithNulls() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); From 9cada49956b24c974f1d46a86d93a2a376ac63b9 Mon Sep 17 00:00:00 2001 From: Isha Tarte Date: Fri, 6 Dec 2024 11:44:20 -0800 Subject: [PATCH 4/4] fix test --- .../spark/bigquery/integration/WriteIntegrationTestBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index 7e1c3d645..7ada5391e 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -1913,8 +1913,8 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); IntegrationTestUtils.runQuery( String.format( - "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) OPTIONS (require_partition_filter = true) " - + "PARTITION BY timestamp_trunc(order_date_time, HOUR) " + "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " + + "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-09-28 10:00:00'), (3, DATETIME '2023-09-28 10:30:00')])", testDataset, testTable, orderId, orderDateTime));