diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index 83dcfa433..7ada5391e 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -1579,7 +1579,8 @@ public void testWriteToTimestampField() { assertThat(head.get(head.fieldIndex("timestamp1"))).isEqualTo(timestamp1); } - protected Dataset writeAndLoadDatasetOverwriteDynamicPartition(Dataset df) { + protected Dataset writeAndLoadDatasetOverwriteDynamicPartition( + Dataset df, boolean isPartitioned) { df.write() .format("bigquery") .mode(SaveMode.Overwrite) @@ -1591,6 +1592,13 @@ protected Dataset writeAndLoadDatasetOverwriteDynamicPartition(Dataset .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .save(); + if (isPartitioned) { + IntegrationTestUtils.runQuery( + String.format( + "ALTER TABLE %s.%s SET OPTIONS (require_partition_filter = false)", + testDataset, testTable)); + } + return spark .read() .format("bigquery") @@ -1607,9 +1615,9 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, HOUR) " + + "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " - + "(2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')])", + + "(2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')]) ", testDataset, testTable, orderId, orderDateTime)); Dataset df = @@ -1621,7 +1629,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1651,7 +1659,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY DATE(order_date_time) " + + "PARTITION BY DATE(order_date_time) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-09-29 10:00:00 UTC'), (3, TIMESTAMP '2023-09-29 17:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1665,7 +1673,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1695,7 +1703,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, MONTH) " + + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (3, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1709,7 +1717,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1739,7 +1747,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) " - + "PARTITION BY timestamp_trunc(order_date_time, YEAR) " + + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2022-09-28 1:00:00 UTC'), " + "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (2, TIMESTAMP '2023-10-25 12:00:00 UTC')])", testDataset, testTable, orderId, orderDateTime)); @@ -1753,7 +1761,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1783,7 +1791,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY order_date " + + "PARTITION BY order_date OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), (2, DATE('2023-09-29'))])", testDataset, testTable, orderId, orderDate)); @@ -1796,7 +1804,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1823,7 +1831,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY DATE_TRUNC(order_date, MONTH) " + + "PARTITION BY DATE_TRUNC(order_date, MONTH) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), " + "(2, DATE('2023-10-29')), (2, DATE('2023-10-28'))])", testDataset, testTable, orderId, orderDate)); @@ -1837,7 +1845,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1864,7 +1872,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) " - + "PARTITION BY DATE_TRUNC(order_date, YEAR) " + + "PARTITION BY DATE_TRUNC(order_date, YEAR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATE('2022-09-28')), " + "(2, DATE('2023-10-29')), (2, DATE('2023-11-28'))])", testDataset, testTable, orderId, orderDate)); @@ -1878,7 +1886,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1906,7 +1914,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, HOUR) " + + "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-09-28 10:00:00'), (3, DATETIME '2023-09-28 10:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -1920,7 +1928,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1948,7 +1956,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, DAY) " + + "PARTITION BY timestamp_trunc(order_date_time, DAY) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-09-29 10:00:00'), (3, DATETIME '2023-09-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -1962,7 +1970,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -1990,7 +1998,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, MONTH) " + + "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), " + "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-10-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -2004,7 +2012,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2032,7 +2040,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) " - + "PARTITION BY timestamp_trunc(order_date_time, YEAR) " + + "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true) " + "AS SELECT * FROM UNNEST([(1, DATETIME '2022-09-28 1:00:00'), " + "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-11-29 17:30:00')])", testDataset, testTable, orderId, orderDateTime)); @@ -2046,7 +2054,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2086,7 +2094,7 @@ public void testOverwriteDynamicPartition_noTimePartitioning() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, false); assertThat(result.count()).isEqualTo(2); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2127,7 +2135,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(5); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2174,7 +2182,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanSt StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(2); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2209,7 +2217,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterTha StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(2); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2244,7 +2252,7 @@ public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList(); rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId)))); @@ -2283,7 +2291,7 @@ public void testOverwriteDynamicPartition_rangePartitionedWithNulls() { StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()), StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty()))); - Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df); + Dataset result = writeAndLoadDatasetOverwriteDynamicPartition(df, true); assertThat(result.count()).isEqualTo(3); List rows = result.collectAsList();