Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support require_partition_filter=true for range partitioned table in DPO #1326

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class BigQueryUtil {
public static final int DEFAULT_BIG_NUMERIC_PRECISION = 76;
public static final int DEFAULT_BIG_NUMERIC_SCALE = 38;
private static final int NO_VALUE = -1;
private static final long BIGQUERY_INTEGER_MIN_VALUE = Long.MIN_VALUE;
static final ImmutableSet<String> INTERNAL_ERROR_MESSAGES =
ImmutableSet.of(
"HTTP/2 error code: INTERNAL_ERROR",
Expand Down Expand Up @@ -714,37 +715,6 @@ public static String prepareQueryForLog(String query, int maxLength) {
: noNewLinesQuery;
}

static String getMergeQueryForPartitionedTable(
String destinationTableName,
String temporaryTableName,
StandardTableDefinition destinationDefinition,
String extractedPartitionedSource,
String extractedPartitionedTarget) {
FieldList allFields = destinationDefinition.getSchema().getFields();
String commaSeparatedFields =
allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`"));
String booleanInjectedColumn = "_" + Long.toString(1234567890123456789L);

String queryFormat =
"MERGE `%s` AS target\n"
+ "USING (SELECT * FROM `%s` CROSS JOIN UNNEST([true, false]) %s) AS source\n"
+ "ON %s = %s AND %s\n"
+ "WHEN MATCHED THEN DELETE\n"
+ "WHEN NOT MATCHED AND NOT %s THEN\n"
+ "INSERT(%s) VALUES(%s)";
return String.format(
queryFormat,
destinationTableName,
temporaryTableName,
booleanInjectedColumn,
extractedPartitionedSource,
extractedPartitionedTarget,
booleanInjectedColumn,
booleanInjectedColumn,
commaSeparatedFields,
commaSeparatedFields);
}

static String getQueryForTimePartitionedTable(
String destinationTableName,
String temporaryTableName,
Expand Down Expand Up @@ -829,12 +799,32 @@ static String getQueryForRangePartitionedTable(
end,
interval);

return getMergeQueryForPartitionedTable(
FieldList allFields = destinationDefinition.getSchema().getFields();
String commaSeparatedFields =
allFields.stream().map(Field::getName).collect(Collectors.joining("`,`", "`", "`"));
String booleanInjectedColumn = "_" + Long.toString(1234567890123456789L);

String queryFormat =
"MERGE `%s` AS target\n"
+ "USING (SELECT * FROM `%s` CROSS JOIN UNNEST([true, false]) %s) AS source\n"
+ "ON %s = %s AND %s AND (target.%s >= %d OR target.%s IS NULL )\n"
+ "WHEN MATCHED THEN DELETE\n"
+ "WHEN NOT MATCHED AND NOT %s THEN\n"
+ "INSERT(%s) VALUES(%s)";
return String.format(
queryFormat,
destinationTableName,
temporaryTableName,
destinationDefinition,
booleanInjectedColumn,
extractedPartitionedSource,
extractedPartitionedTarget);
extractedPartitionedTarget,
booleanInjectedColumn,
partitionField,
BIGQUERY_INTEGER_MIN_VALUE,
partitionField,
booleanInjectedColumn,
commaSeparatedFields,
commaSeparatedFields);
}

// based on https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration, it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2119,7 +2119,7 @@ public void testOverwriteDynamicPartition_rangePartitioned() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(8, 1005), ( 21, 1010), (83, 1020)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2170,7 +2170,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeLessThanSt
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(2, 1005), ( 150, 1010)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2205,7 +2205,7 @@ public void testOverwriteDynamicPartition_rangePartitionedOutsideRangeGreaterTha
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(2, 1005), ( -1, 1010)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2240,7 +2240,7 @@ public void testOverwriteDynamicPartition_rangePartitionedBoundaryCondition() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(1, 1000), "
+ "(11, 1005), ( 100, 1010)])",
testDataset, testTable, orderId, orderCount));
Expand Down Expand Up @@ -2279,7 +2279,7 @@ public void testOverwriteDynamicPartition_rangePartitionedWithNulls() {
IntegrationTestUtils.runQuery(
String.format(
"CREATE TABLE `%s.%s` (%s INTEGER, %s INTEGER) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) "
+ "PARTITION BY RANGE_BUCKET(order_id, GENERATE_ARRAY(1, 100, 10)) OPTIONS (require_partition_filter = true)"
+ "AS SELECT * FROM UNNEST([(NULL, 1000), "
+ "(11, 1005)])",
testDataset, testTable, orderId, orderCount));
Expand Down
Loading