Skip to content

Commit

Permalink
Integration tests with require_partition_filter = true for DPO time p…
Browse files Browse the repository at this point in the history
…artitioned tables (#1322)
  • Loading branch information
isha97 authored Dec 6, 2024
1 parent b842be6 commit 83c83e7
Showing 1 changed file with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,8 @@ public void testWriteToTimestampField() {
assertThat(head.get(head.fieldIndex("timestamp1"))).isEqualTo(timestamp1);
}

protected Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition(Dataset<Row> df) {
protected Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition(
Dataset<Row> df, boolean isPartitioned) {
df.write()
.format("bigquery")
.mode(SaveMode.Overwrite)
Expand All @@ -1591,6 +1592,13 @@ protected Dataset<Row> writeAndLoadDatasetOverwriteDynamicPartition(Dataset<Row>
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.save();

if (isPartitioned) {
IntegrationTestUtils.runQuery(
String.format(
"ALTER TABLE %s.%s SET OPTIONS (require_partition_filter = false)",
testDataset, testTable));
}

return spark
.read()
.format("bigquery")
Expand All @@ -1607,9 +1615,9 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) "
+ "PARTITION BY timestamp_trunc(order_date_time, HOUR) "
+ "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), "
+ "(2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')])",
+ "(2, TIMESTAMP '2023-09-28 10:00:00 UTC'), (3, TIMESTAMP '2023-09-28 10:30:00 UTC')]) ",
testDataset, testTable, orderId, orderDateTime));

Dataset<Row> df =
Expand All @@ -1621,7 +1629,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByHour() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1651,7 +1659,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) "
+ "PARTITION BY DATE(order_date_time) "
+ "PARTITION BY DATE(order_date_time) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), "
+ "(2, TIMESTAMP '2023-09-29 10:00:00 UTC'), (3, TIMESTAMP '2023-09-29 17:00:00 UTC')])",
testDataset, testTable, orderId, orderDateTime));
Expand All @@ -1665,7 +1673,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByDay() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1695,7 +1703,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) "
+ "PARTITION BY timestamp_trunc(order_date_time, MONTH) "
+ "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2023-09-28 1:00:00 UTC'), "
+ "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (3, TIMESTAMP '2023-10-25 12:00:00 UTC')])",
testDataset, testTable, orderId, orderDateTime));
Expand All @@ -1709,7 +1717,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByMonth() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1739,7 +1747,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s TIMESTAMP) "
+ "PARTITION BY timestamp_trunc(order_date_time, YEAR) "
+ "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, TIMESTAMP '2022-09-28 1:00:00 UTC'), "
+ "(2, TIMESTAMP '2023-10-20 10:00:00 UTC'), (2, TIMESTAMP '2023-10-25 12:00:00 UTC')])",
testDataset, testTable, orderId, orderDateTime));
Expand All @@ -1753,7 +1761,7 @@ public void testOverwriteDynamicPartition_partitionTimestampByYear() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1783,7 +1791,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) "
+ "PARTITION BY order_date "
+ "PARTITION BY order_date OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), (2, DATE('2023-09-29'))])",
testDataset, testTable, orderId, orderDate));

Expand All @@ -1796,7 +1804,7 @@ public void testOverwriteDynamicPartition_partitionDateByDay() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand All @@ -1823,7 +1831,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) "
+ "PARTITION BY DATE_TRUNC(order_date, MONTH) "
+ "PARTITION BY DATE_TRUNC(order_date, MONTH) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, DATE('2023-09-28')), "
+ "(2, DATE('2023-10-29')), (2, DATE('2023-10-28'))])",
testDataset, testTable, orderId, orderDate));
Expand All @@ -1837,7 +1845,7 @@ public void testOverwriteDynamicPartition_partitionDateByMonth() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand All @@ -1864,7 +1872,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s DATE) "
+ "PARTITION BY DATE_TRUNC(order_date, YEAR) "
+ "PARTITION BY DATE_TRUNC(order_date, YEAR) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, DATE('2022-09-28')), "
+ "(2, DATE('2023-10-29')), (2, DATE('2023-11-28'))])",
testDataset, testTable, orderId, orderDate));
Expand All @@ -1878,7 +1886,7 @@ public void testOverwriteDynamicPartition_partitionDateByYear() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDate, DataTypes.DateType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1906,7 +1914,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) "
+ "PARTITION BY timestamp_trunc(order_date_time, HOUR) "
+ "PARTITION BY timestamp_trunc(order_date_time, HOUR) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), "
+ "(2, DATETIME '2023-09-28 10:00:00'), (3, DATETIME '2023-09-28 10:30:00')])",
testDataset, testTable, orderId, orderDateTime));
Expand All @@ -1920,7 +1928,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByHour() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1948,7 +1956,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) "
+ "PARTITION BY timestamp_trunc(order_date_time, DAY) "
+ "PARTITION BY timestamp_trunc(order_date_time, DAY) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), "
+ "(2, DATETIME '2023-09-29 10:00:00'), (3, DATETIME '2023-09-29 17:30:00')])",
testDataset, testTable, orderId, orderDateTime));
Expand All @@ -1962,7 +1970,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByDay() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -1990,7 +1998,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) "
+ "PARTITION BY timestamp_trunc(order_date_time, MONTH) "
+ "PARTITION BY timestamp_trunc(order_date_time, MONTH) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, DATETIME '2023-09-28 1:00:00'), "
+ "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-10-29 17:30:00')])",
testDataset, testTable, orderId, orderDateTime));
Expand All @@ -2004,7 +2012,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByMonth() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2032,7 +2040,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s DATETIME) "
+ "PARTITION BY timestamp_trunc(order_date_time, YEAR) "
+ "PARTITION BY timestamp_trunc(order_date_time, YEAR) OPTIONS (require_partition_filter = true) "
+ "AS SELECT * FROM UNNEST([(1, DATETIME '2022-09-28 1:00:00'), "
+ "(2, DATETIME '2023-10-29 10:00:00'), (3, DATETIME '2023-11-29 17:30:00')])",
testDataset, testTable, orderId, orderDateTime));
Expand All @@ -2046,7 +2054,7 @@ public void testOverwriteDynamicPartition_partitionDateTimeByYear() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, timeStampNTZType.get(), true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2086,7 +2094,7 @@ public void testOverwriteDynamicPartition_noTimePartitioning() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderDateTime, DataTypes.TimestampType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, false);
assertThat(result.count()).isEqualTo(2);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2127,7 +2135,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(5);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2174,7 +2182,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanSt
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(2);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2209,7 +2217,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterTha
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(2);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2244,7 +2252,7 @@ public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);
List<Row> rows = result.collectAsList();
rows.sort(Comparator.comparing(row -> row.getLong(row.fieldIndex(orderId))));
Expand Down Expand Up @@ -2283,7 +2291,7 @@ public void testOverwriteDynamicPartition_rangePartitionedWithNulls() {
StructField.apply(orderId, DataTypes.IntegerType, true, Metadata.empty()),
StructField.apply(orderCount, DataTypes.IntegerType, true, Metadata.empty())));

Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df);
Dataset<Row> result = writeAndLoadDatasetOverwriteDynamicPartition(df, true);
assertThat(result.count()).isEqualTo(3);

List<Row> rows = result.collectAsList();
Expand Down

0 comments on commit 83c83e7

Please sign in to comment.