diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryUtil.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryUtil.java index 8bcb23d42..733703c18 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryUtil.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryUtil.java @@ -78,6 +78,7 @@ public class BigQueryUtil { public static final int DEFAULT_BIG_NUMERIC_PRECISION = 76; public static final int DEFAULT_BIG_NUMERIC_SCALE = 38; private static final int NO_VALUE = -1; + private static final long BIGQUERY_INTEGER_MIN_VALUE = Long.MIN_VALUE; static final ImmutableSet INTERNAL_ERROR_MESSAGES = ImmutableSet.of( "HTTP/2 error code: INTERNAL_ERROR", @@ -714,37 +715,6 @@ public static String prepareQueryForLog(String query, int maxLength) { : noNewLinesQuery; } - static String getMergeQueryForPartitionedTable( - String destinationTableName, - String temporaryTableName, - StandardTableDefinition destinationDefinition, - String extractedPartitionedSource, - String extractedPartitionedTarget) { - FieldList allFields = destinationDefinition.getSchema().getFields(); - String commaSeparatedFields = - allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`")); - String booleanInjectedColumn = "_" + Long.toString(1234567890123456789L); - - String queryFormat = - "MERGE `%s` AS target\n" - + "USING (SELECT * FROM `%s` CROSS JOIN UNNEST([true, false]) %s) AS source\n" - + "ON %s = %s AND %s\n" - + "WHEN MATCHED THEN DELETE\n" - + "WHEN NOT MATCHED AND NOT %s THEN\n" - + "INSERT(%s) VALUES(%s)"; - return String.format( - queryFormat, - destinationTableName, - temporaryTableName, - booleanInjectedColumn, - extractedPartitionedSource, - extractedPartitionedTarget, - booleanInjectedColumn, - booleanInjectedColumn, - commaSeparatedFields, - commaSeparatedFields); - } - static String getQueryForTimePartitionedTable( String destinationTableName, String temporaryTableName, @@ -829,12 +799,32 @@ static String getQueryForRangePartitionedTable( end, interval); - return getMergeQueryForPartitionedTable( + FieldList allFields = destinationDefinition.getSchema().getFields(); + String commaSeparatedFields = + allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`")); + String booleanInjectedColumn = "_" + Long.toString(1234567890123456789L); + + String queryFormat = + "MERGE `%s` AS target\n" + + "USING (SELECT * FROM `%s` CROSS JOIN UNNEST([true, false]) %s) AS source\n" + + "ON %s = %s AND %s AND (target.%s >= %d OR target.%s IS NULL )\n" + + "WHEN MATCHED THEN DELETE\n" + + "WHEN NOT MATCHED AND NOT %s THEN\n" + + "INSERT(%s) VALUES(%s)"; + return String.format( + queryFormat, destinationTableName, temporaryTableName, - destinationDefinition, + booleanInjectedColumn, extractedPartitionedSource, - extractedPartitionedTarget); + extractedPartitionedTarget, + booleanInjectedColumn, + partitionField, + BIGQUERY_INTEGER_MIN_VALUE, + partitionField, + booleanInjectedColumn, + commaSeparatedFields, + commaSeparatedFields); } // based on https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration, it diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index 7ada5391e..684c4642f 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -2119,7 +2119,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) " - + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) " + + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, 1000), " + "(8, 1005), ( 21, 1010), (83, 1020)])", testDataset, testTable, orderId, orderCount)); @@ -2170,7 +2170,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanSt IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) " - + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) " + + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, 1000), " + "(2, 1005), ( 150, 1010)])", testDataset, testTable, orderId, orderCount)); @@ -2205,7 +2205,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterTha IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) " - + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) " + + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, 1000), " + "(2, 1005), ( -1, 1010)])", testDataset, testTable, orderId, orderCount)); @@ -2240,7 +2240,7 @@ public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) " - + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) " + + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(1, 1000), " + "(11, 1005), ( 100, 1010)])", testDataset, testTable, orderId, orderCount)); @@ -2279,7 +2279,7 @@ public void testOverwriteDynamicPartition_rangePartitionedWithNulls() { IntegrationTestUtils.runQuery( String.format( "CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) " - + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) " + + "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)" + "AS SELECT * FROM UNNEST([(NULL, 1000), " + "(11, 1005)])", testDataset, testTable, orderId, orderCount));