diff --git a/integrations/java/openhouse-java-itest/src/test/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperationsTest.java b/integrations/java/openhouse-java-itest/src/test/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperationsTest.java index 30471a78..b5e01aed 100644 --- a/integrations/java/openhouse-java-itest/src/test/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperationsTest.java +++ b/integrations/java/openhouse-java-itest/src/test/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperationsTest.java @@ -247,4 +247,87 @@ public void testColumnPolicyTagsExistUpdateExistingPolicyTags() { Assertions.assertTrue(updatedPolicies.getColumnTags().containsKey("col1")); Assertions.assertEquals(tagHC, updatedPolicies.getColumnTags().get("col1").getTags()); } + + @Test + public void testPoliciesReplicationExistsButNoUpdateEmptyInterval() { + Map props = new HashMap<>(); + props.put( + "policies", "{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"\"}]}}"); + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.properties()).thenReturn(props); + OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class); + when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod(); + Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata); + Assertions.assertNotNull(updatedPolicies); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(0).getDestination(), "a"); + Assertions.assertTrue( + updatedPolicies.getReplication().getConfig().get(0).getInterval().isEmpty()); + Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1); + } + + @Test + public void testNoPoliciesReplicationButUpdateExists() { + Map props = new HashMap<>(); + props.put( + "updated.openhouse.policy", + "{\"replication\":{\"config\":[{\"destination\":\"aa\", \"interval\":\"1D\"}]}}"); + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.properties()).thenReturn(props); + OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class); + when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod(); + Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata); + Assertions.assertNotNull(updatedPolicies); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(0).getDestination(), "aa"); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(0).getInterval(), "1D"); + Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1); + } + + @Test + public void testPoliciesReplicationExistsUpdateExists() { + Map props = new HashMap<>(); + props.put( + "policies", + "{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}, {\"destination\":\"b\", \"interval\":\"1D\"}]}}"); + props.put( + "updated.openhouse.policy", + "{\"replication\":{\"config\":[{\"destination\":\"aa\", \"interval\":\"2D\"}]}}"); + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.properties()).thenReturn(props); + OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class); + when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod(); + Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(0).getDestination(), "aa"); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(0).getInterval(), "2D"); + Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1); + } + + @Test + public void testPoliciesReplicationExistsUpdateExistsForMultiple() { + Map props = new HashMap<>(); + props.put( + "policies", + "{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}]}}"); + props.put( + "updated.openhouse.policy", + "{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}, {\"destination\":\"aa\", \"interval\":\"2D\"}]}}"); + TableMetadata metadata = mock(TableMetadata.class); + when(metadata.properties()).thenReturn(props); + OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class); + when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod(); + Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(0).getDestination(), "a"); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(0).getInterval(), "1D"); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(1).getDestination(), "aa"); + Assertions.assertEquals( + updatedPolicies.getReplication().getConfig().get(1).getInterval(), "2D"); + Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 2); + } } diff --git a/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperations.java b/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperations.java index bb3c16ac..de648b63 100644 --- a/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperations.java +++ b/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseTableOperations.java @@ -170,7 +170,6 @@ protected CreateUpdateTableRequestBody constructMetadataRequestBody( CreateUpdateTableRequestBody.TableTypeEnum.valueOf( metadata.properties().get(OPENHOUSE_TABLE_TYPE_KEY))); } - return createUpdateTableRequestBody; } @@ -212,6 +211,10 @@ Policies buildUpdatedPolicies(TableMetadata metadata) { } policies.setColumnTags(patchUpdatedPolicy.getColumnTags()); } + // Update replication config + if (patchUpdatedPolicy.getReplication() != null) { + policies.replication(patchUpdatedPolicy.getReplication()); + } return policies; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/Policies.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/Policies.java index e8fd3235..c855bc34 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/Policies.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/Policies.java @@ -41,4 +41,11 @@ public class Policies { example = "{'colName': [PII, HC]}") @Valid Map columnTags; + + @Schema( + description = + "Replication as required in /tables API request. This field holds the replication spec config.", + example = "{replication:{config:[{destination: clusterA, interval: 12H}]}}") + @Valid + Replication replication; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/Replication.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/Replication.java new file mode 100644 index 00000000..43a22ab2 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/Replication.java @@ -0,0 +1,27 @@ +package com.linkedin.openhouse.tables.api.spec.v0.request.components; + +import io.swagger.v3.oas.annotations.media.Schema; +import java.util.List; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Builder(toBuilder = true) +@EqualsAndHashCode +@Getter +@AllArgsConstructor(access = AccessLevel.PROTECTED) +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class Replication { + @Schema( + description = + "Replication config for the destination cluster name and replication job interval", + example = "[{destination: clusterA, interval: 12H}, {destination: clusterB, interval: 12H}]") + @NotNull(message = "Incorrect replication policy specified. Replication config cannot be null.") + @Valid + List config; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java new file mode 100644 index 00000000..3bb69284 --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/spec/v0/request/components/ReplicationConfig.java @@ -0,0 +1,32 @@ +package com.linkedin.openhouse.tables.api.spec.v0.request.components; + +import io.swagger.v3.oas.annotations.media.Schema; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; + +@Builder(toBuilder = true) +@EqualsAndHashCode +@Getter +@AllArgsConstructor(access = AccessLevel.PROTECTED) +@NoArgsConstructor(access = AccessLevel.PROTECTED) +public class ReplicationConfig { + @Schema(description = "Replication destination cluster name", example = "clusterA") + @NotNull( + message = + "Incorrect destination specified. Destination field for replication config cannot be null") + @Valid + String destination; + + @Schema( + description = + "Optional parameter interval at which the replication job should run. Default value is 1D", + example = "1D") + @Valid + String interval; +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/OpenHouseTablesApiValidator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/OpenHouseTablesApiValidator.java index 8f70a469..323047a7 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/OpenHouseTablesApiValidator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/OpenHouseTablesApiValidator.java @@ -124,14 +124,16 @@ private List validateUUIDForReplicaTable( } private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequestBody) { - if (!policiesSpecValidator.validate( - createUpdateTableRequestBody.getPolicies(), - createUpdateTableRequestBody.getTimePartitioning(), + TableUri tableUri = TableUri.builder() .tableId(createUpdateTableRequestBody.getTableId()) .clusterId(createUpdateTableRequestBody.getClusterId()) .databaseId(createUpdateTableRequestBody.getDatabaseId()) - .build(), + .build(); + if (!policiesSpecValidator.validate( + createUpdateTableRequestBody.getPolicies(), + createUpdateTableRequestBody.getTimePartitioning(), + tableUri, createUpdateTableRequestBody.getSchema())) { throw new RequestValidationFailureException( Arrays.asList( diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidator.java index 064b3b45..01364772 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidator.java @@ -4,6 +4,7 @@ import com.linkedin.openhouse.common.api.spec.TableUri; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; import com.linkedin.openhouse.tables.common.DefaultColumnPattern; @@ -86,9 +87,71 @@ public boolean validate( return false; } } + + return validateReplication(policies, tableUri); + } + + /** + * Valid cases for replication object: 0. Interval input can be either be accepted as 12H or daily + * from 1-3D 1. Destination cluster cannot be equal to the source cluster + */ + protected boolean validateReplication(Policies policies, TableUri tableUri) { + if (policies != null + && policies.getReplication() != null + && policies.getReplication().getConfig() != null) { + return policies.getReplication().getConfig().stream() + .allMatch( + replicationConfig -> { + if (replicationConfig.getInterval() != null + && !replicationConfig.getInterval().isEmpty()) { + if (!validateReplicationInterval(replicationConfig)) { + failureMessage = + String.format( + "Replication interval for the table [%s] can either be 12 hours or daily for up to 3 days", + tableUri); + return false; + } + } + if (replicationConfig.getDestination() != null) { + if (!validateReplicationDestination(replicationConfig, tableUri)) { + failureMessage = + String.format( + "Replication destination cluster for the table [%s] must be different from the source cluster", + tableUri); + return false; + } + } + return true; + }); + } return true; } + /** + * Validate that the optional interval parameter provided by users exists as an interval of 12 or + * as a daily value up to 3 days + */ + protected boolean validateReplicationInterval(ReplicationConfig replicationConfig) { + String granularity = + replicationConfig.getInterval().substring(replicationConfig.getInterval().length() - 1); + int interval = + Integer.parseInt( + replicationConfig + .getInterval() + .substring(0, replicationConfig.getInterval().length() - 1)); + + return (interval >= 1 && interval <= 3 && granularity.equals("D")) + || (interval == 12 && granularity.equals("H")); + } + + /** + * Validate that the destination cluster provided by users is not the same as the source cluster + */ + protected boolean validateReplicationDestination( + ReplicationConfig replicationConfig, TableUri tableUri) { + return !replicationConfig.getDestination().toString().equals(tableUri.getClusterId()); + } + /** * Validate the pattern provided by users are legit pattern that complies with {@link * DateTimeFormatter} symbols. Also, the provided column name needs to be part of schema. diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/common/ReplicationInterval.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/common/ReplicationInterval.java new file mode 100644 index 00000000..b2d32c3e --- /dev/null +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/common/ReplicationInterval.java @@ -0,0 +1,19 @@ +package com.linkedin.openhouse.tables.common; + +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; + +/** ENUM for default replication interval associated with Interval in {@link ReplicationConfig} */ +public enum ReplicationInterval { + // default interval to run replication jobs if no interval provided by user + DEFAULT("1D"); + + private final String interval; + + ReplicationInterval(String interval) { + this.interval = interval; + } + + public String getInterval() { + return interval; + } +} diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java index 41fa43fd..065916df 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/dto/mapper/iceberg/PoliciesSpecMapper.java @@ -4,9 +4,14 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonParseException; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.Replication; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.common.DefaultColumnPattern; +import com.linkedin.openhouse.tables.common.ReplicationInterval; import com.linkedin.openhouse.tables.model.TableDto; +import java.util.List; +import java.util.stream.Collectors; import org.mapstruct.Mapper; import org.mapstruct.Named; @@ -61,6 +66,7 @@ public Policies toPoliciesObject(String policiesString) throws JsonParseExceptio @Named("mapPolicies") public Policies mapPolicies(Policies policies) { String defaultPattern; + Policies updatedPolicies = policies; if (policies != null && policies.getRetention() != null && policies.getRetention().getColumnPattern() != null @@ -86,9 +92,44 @@ public Policies mapPolicies(Policies policies) { .pattern(defaultPattern) .build()) .build(); - return policies.toBuilder().retention(retentionPolicy).build(); - } else { - return policies; + updatedPolicies = policies.toBuilder().retention(retentionPolicy).build(); } + if (policies != null && policies.getReplication() != null) { + updatedPolicies = + updatedPolicies + .toBuilder() + .replication(mapReplicationPolicies(policies.getReplication())) + .build(); + } + return updatedPolicies; + } + + /** + * mapRetentionPolicies is a mapStruct function which assigns default interval value in + * replication config if the interval is empty. Default values for pattern are defined at {@link + * ReplicationInterval}. + * + * @param replicationPolicy config for Openhouse table + * @return mapped policies object + */ + private Replication mapReplicationPolicies(Replication replicationPolicy) { + if (replicationPolicy != null && replicationPolicy.getConfig() != null) { + List replicationConfig = + replicationPolicy.getConfig().stream() + .map( + replication -> { + if (replication.getInterval() == null || replication.getInterval().isEmpty()) { + return replication + .toBuilder() + .interval(ReplicationInterval.DEFAULT.getInterval()) + .build(); + } + return replication; + }) + .collect(Collectors.toList()); + + return replicationPolicy.toBuilder().config(replicationConfig).build(); + } + return replicationPolicy; } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidatorTest.java index f150d39e..4cdd8423 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidatorTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/PoliciesSpecValidatorTest.java @@ -5,10 +5,13 @@ import com.linkedin.openhouse.common.api.spec.TableUri; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.Replication; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.api.spec.v0.request.components.RetentionColumnPattern; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; import java.lang.reflect.Field; +import java.util.Arrays; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Assertions; @@ -249,4 +252,92 @@ void testValidate() { // with error message validation } + + @Test + void testValidateReplicationConfig() { + // Positive: valid replication config + TableUri tableUri = TableUri.builder().clusterId("testClusterA").build(); + ReplicationConfig replication1 = + ReplicationConfig.builder().destination("testClusterB").interval("12H").build(); + + Assertions.assertTrue(validator.validateReplicationDestination(replication1, tableUri)); + Assertions.assertTrue(validator.validateReplicationInterval(replication1)); + + replication1 = ReplicationConfig.builder().destination("testCluster").build(); + Assertions.assertTrue(validator.validateReplicationDestination(replication1, tableUri)); + + // Negative: destination cluster equal to source cluster + replication1 = ReplicationConfig.builder().destination(tableUri.getClusterId()).build(); + Assertions.assertFalse(validator.validateReplicationDestination(replication1, tableUri)); + + replication1 = + ReplicationConfig.builder().destination(tableUri.getClusterId()).interval("12H").build(); + Assertions.assertFalse(validator.validateReplicationDestination(replication1, tableUri)); + + // Negative: invalid interval input + replication1 = + ReplicationConfig.builder().destination(tableUri.getClusterId()).interval("13H").build(); + Assertions.assertFalse(validator.validateReplicationInterval(replication1)); + replication1 = + ReplicationConfig.builder().destination(tableUri.getClusterId()).interval("24H").build(); + Assertions.assertFalse(validator.validateReplicationInterval(replication1)); + replication1 = + ReplicationConfig.builder().destination(tableUri.getClusterId()).interval("48H").build(); + Assertions.assertFalse(validator.validateReplicationInterval(replication1)); + + // Positive: valid replication config with multiple destinations + replication1 = ReplicationConfig.builder().destination("testCluster1").interval("1D").build(); + ReplicationConfig replication2 = + ReplicationConfig.builder().destination("testCluster2").interval("2D").build(); + Policies policies0 = + Policies.builder() + .replication( + Replication.builder().config(Arrays.asList(replication1, replication2)).build()) + .build(); + Assertions.assertTrue( + validator.validate(policies0, null, tableUri, getSchemaJsonFromSchema(dummySchema))); + + // Negative: destination cluster equal to source cluster + replication1 = ReplicationConfig.builder().destination(tableUri.getClusterId()).build(); + policies0 = + Policies.builder() + .replication( + Replication.builder().config(Arrays.asList(replication1, replication2)).build()) + .build(); + + Assertions.assertFalse( + validator.validate(policies0, null, tableUri, getSchemaJsonFromSchema(dummySchema))); + Field failedMsg = + org.springframework.util.ReflectionUtils.findField( + PoliciesSpecValidator.class, "failureMessage"); + Assertions.assertNotNull(failedMsg); + org.springframework.util.ReflectionUtils.makeAccessible(failedMsg); + Assertions.assertTrue( + ((String) org.springframework.util.ReflectionUtils.getField(failedMsg, validator)) + .contains( + String.format( + "Replication destination cluster for the table [%s] must be different from the source cluster", + tableUri))); + + // Negative: invalid interval input + replication1 = ReplicationConfig.builder().destination("testCluster1").interval("13H").build(); + policies0 = + Policies.builder() + .replication( + Replication.builder().config(Arrays.asList(replication1, replication2)).build()) + .build(); + Assertions.assertFalse( + validator.validate(policies0, null, tableUri, getSchemaJsonFromSchema(dummySchema))); + failedMsg = + org.springframework.util.ReflectionUtils.findField( + PoliciesSpecValidator.class, "failureMessage"); + Assertions.assertNotNull(failedMsg); + org.springframework.util.ReflectionUtils.makeAccessible(failedMsg); + Assertions.assertTrue( + ((String) org.springframework.util.ReflectionUtils.getField(failedMsg, validator)) + .contains( + String.format( + "Replication interval for the table [%s] can either be 12 hours or daily for up to 3 days", + tableUri))); + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java index c4a7b42b..fc474e41 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java @@ -26,6 +26,8 @@ import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; import com.linkedin.openhouse.tables.api.spec.v0.request.components.PolicyTag; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.Replication; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.api.spec.v0.request.components.RetentionColumnPattern; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; @@ -1013,4 +1015,152 @@ public void testCreateRequestSucceedsForNullColumnTags() throws Exception { Assertions.assertNull(updatedPolicies.get("columnTags")); RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); } + + @Test + public void testUpdateSucceedsForReplicationConfig() throws Exception { + MvcResult mvcResult = + RequestAndValidateHelper.createTableAndValidateResponse( + GET_TABLE_RESPONSE_BODY, mvc, storageManager); + + LinkedHashMap currentPolicies = + JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); + + ReplicationConfig replicationConfig = + ReplicationConfig.builder().destination("clusterA").interval("").build(); + Replication replication = + Replication.builder().config(Arrays.asList(replicationConfig)).build(); + Policies newPolicies = Policies.builder().replication(replication).build(); + + GetTableResponseBody container = GetTableResponseBody.builder().policies(newPolicies).build(); + GetTableResponseBody addProp = buildGetTableResponseBody(mvcResult, container); + mvcResult = + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + ValidationUtilities.CURRENT_MAJOR_VERSION_PREFIX + + "/databases/%s/tables/%s", + addProp.getDatabaseId(), + addProp.getTableId())) + .contentType(MediaType.APPLICATION_JSON) + .content(buildCreateUpdateTableRequestBody(addProp).toJson()) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn(); + + LinkedHashMap updatedPolicies = + JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); + + Assertions.assertNotEquals(currentPolicies, updatedPolicies); + Assertions.assertEquals( + updatedPolicies.get("replication").get("config").toString(), + "[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]"); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); + } + + @Test + public void testUpdateSucceedsForReplicationAndRetention() throws Exception { + MvcResult mvcResult = + RequestAndValidateHelper.createTableAndValidateResponse( + GET_TABLE_RESPONSE_BODY + .toBuilder() + .timePartitioning(null) + .policies(TABLE_POLICIES_COMPLEX) + .build(), + mvc, + storageManager); + + LinkedHashMap currentPolicies = + JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); + + ReplicationConfig replicationConfig = + ReplicationConfig.builder().destination("clusterA").interval("").build(); + Replication replication = + Replication.builder().config(Arrays.asList(replicationConfig)).build(); + Retention retention = + Retention.builder() + .count(4) + .granularity(TimePartitionSpec.Granularity.HOUR) + .columnPattern( + RetentionColumnPattern.builder() + .pattern("yyyy-MM-dd") + .columnName("timestampCol") + .build()) + .build(); + Policies newPolicies = Policies.builder().replication(replication).retention(retention).build(); + + GetTableResponseBody container = GetTableResponseBody.builder().policies(newPolicies).build(); + GetTableResponseBody addProp = buildGetTableResponseBody(mvcResult, container); + mvcResult = + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + ValidationUtilities.CURRENT_MAJOR_VERSION_PREFIX + + "/databases/%s/tables/%s", + addProp.getDatabaseId(), + addProp.getTableId())) + .contentType(MediaType.APPLICATION_JSON) + .content(buildCreateUpdateTableRequestBody(addProp).toJson()) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn(); + + LinkedHashMap updatedPolicies = + JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); + + Assertions.assertNotEquals(currentPolicies, updatedPolicies); + Assertions.assertEquals( + updatedPolicies.get("replication").get("config").toString(), + "[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]"); + Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4); + Assertions.assertEquals( + ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"), + "timestampCol"); + Assertions.assertEquals( + ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"), + "yyyy-MM-dd"); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); + } + + @Test + public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception { + MvcResult mvcResult = + RequestAndValidateHelper.createTableAndValidateResponse( + GET_TABLE_RESPONSE_BODY, mvc, storageManager); + + LinkedHashMap currentPolicies = + JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); + + ReplicationConfig replicationConfig1 = + ReplicationConfig.builder().destination("clusterA").interval("").build(); + ReplicationConfig replicationConfig2 = + ReplicationConfig.builder().destination("clusterB").interval("12H").build(); + Replication replication = + Replication.builder().config(Arrays.asList(replicationConfig1, replicationConfig2)).build(); + Policies newPolicies = Policies.builder().replication(replication).build(); + + GetTableResponseBody container = GetTableResponseBody.builder().policies(newPolicies).build(); + GetTableResponseBody addProp = buildGetTableResponseBody(mvcResult, container); + mvcResult = + mvc.perform( + MockMvcRequestBuilders.put( + String.format( + ValidationUtilities.CURRENT_MAJOR_VERSION_PREFIX + + "/databases/%s/tables/%s", + addProp.getDatabaseId(), + addProp.getTableId())) + .contentType(MediaType.APPLICATION_JSON) + .content(buildCreateUpdateTableRequestBody(addProp).toJson()) + .accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn(); + + LinkedHashMap updatedPolicies = + JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies"); + + Assertions.assertNotEquals(currentPolicies, updatedPolicies); + Assertions.assertEquals( + updatedPolicies.get("replication").get("config").toString(), + "[{\"destination\":\"clusterA\",\"interval\":\"1D\"},{\"destination\":\"clusterB\",\"interval\":\"12H\"}]"); + RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY); + } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/api/TablesValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/api/TablesValidatorTest.java index c27e9fc0..4c00bc87 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/api/TablesValidatorTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/api/TablesValidatorTest.java @@ -11,6 +11,8 @@ import com.linkedin.openhouse.tables.api.spec.v0.request.UpdateAclPoliciesRequestBody; import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.Replication; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.api.spec.v0.request.components.RetentionColumnPattern; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; @@ -282,6 +284,107 @@ public void validateCreateTableRequestParamWithInvalidDaysInPoliciesObject() { } } + @Test + public void validateCreateTableRequestParamWithInvalidReplicationDestinationInPoliciesObject() { + assertThrows( + RequestValidationFailureException.class, + () -> + tablesApiValidator.validateCreateTable( + "c", + "d", + CreateUpdateTableRequestBody.builder() + .databaseId("d") + .tableId("t") + .clusterId("c") + .schema(HEALTH_SCHEMA_LITERAL) + .tableProperties(ImmutableMap.of()) + .timePartitioning( + TimePartitionSpec.builder() + .columnName("timestamp") + .granularity(TimePartitionSpec.Granularity.HOUR) + .build()) + .policies( + Policies.builder() + .replication( + Replication.builder() + .config( + Arrays.asList( + ReplicationConfig.builder().destination("c").build())) + .build()) + .build()) + .baseTableVersion("base") + .build())); + } + + @Test + public void validateCreateTableRequestParamWithInvalidReplicationIntervalInPoliciesObject() { + assertThrows( + RequestValidationFailureException.class, + () -> + tablesApiValidator.validateCreateTable( + "c", + "d", + CreateUpdateTableRequestBody.builder() + .databaseId("d") + .tableId("t") + .clusterId("c") + .schema(HEALTH_SCHEMA_LITERAL) + .tableProperties(ImmutableMap.of()) + .timePartitioning( + TimePartitionSpec.builder() + .columnName("timestamp") + .granularity(TimePartitionSpec.Granularity.HOUR) + .build()) + .policies( + Policies.builder() + .replication( + Replication.builder() + .config( + Arrays.asList( + ReplicationConfig.builder() + .destination("z") + .interval("13H") + .build())) + .build()) + .build()) + .baseTableVersion("base") + .build())); + } + + @Test + public void validateCreateTableRequestParamWithValidReplicationInPoliciesObject() { + assertDoesNotThrow( + () -> + tablesApiValidator.validateCreateTable( + "c", + "d", + CreateUpdateTableRequestBody.builder() + .databaseId("d") + .tableId("t") + .clusterId("c") + .schema(HEALTH_SCHEMA_LITERAL) + .tableProperties(ImmutableMap.of()) + .timePartitioning( + TimePartitionSpec.builder() + .columnName("timestamp") + .granularity(TimePartitionSpec.Granularity.HOUR) + .build()) + .policies( + Policies.builder() + .replication( + Replication.builder() + .config( + Arrays.asList( + ReplicationConfig.builder() + .destination("z") + .interval("12H") + .build())) + .build()) + .build()) + .baseTableVersion("base") + .build())); + } + @Test public void validateCreateTableSpecialCharacterAndEmpty() { assertThrows( diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java index fcee72f4..3f6861a2 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/PoliciesSpecMapperTest.java @@ -34,6 +34,9 @@ public void testToPoliciesSpecJson() { Assertions.assertEquals( (Integer) JsonPath.read(policiesSpec, "$.retention.count"), TableModelConstants.TABLE_POLICIES.getRetention().getCount()); + Assertions.assertEquals( + JsonPath.read(policiesSpec, "$.replication.config[0].destination"), + TableModelConstants.TABLE_POLICIES.getReplication().getConfig().get(0).getDestination()); } @Test diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/TablesMapperTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/TablesMapperTest.java index 0419e66c..11b8e387 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/TablesMapperTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/mapper/TablesMapperTest.java @@ -167,6 +167,31 @@ public void testToTableDtoWithPutSnapshotsEmptyPolicyColumnPattern() { tableDto1.getPolicies().getRetention().getColumnPattern().getColumnName(), "name"); } + @Test + public void testToTableDtoWithReplicationPolicy() { + IcebergSnapshotsRequestBody icebergSnapshotsRequestBody = + IcebergSnapshotsRequestBody.builder() + .baseTableVersion("v1") + .createUpdateTableRequestBody( + CREATE_TABLE_REQUEST_BODY_WITHIN_SNAPSHOTS_REQUEST + .toBuilder() + .policies(TABLE_POLICIES) + .build()) + .jsonSnapshots(Collections.singletonList("dummy")) + .build(); + TableDto tableDto1 = tablesMapper.toTableDto(TABLE_DTO, icebergSnapshotsRequestBody); + Assertions.assertEquals( + tableDto1.getDatabaseId(), + CREATE_TABLE_REQUEST_BODY_WITHIN_SNAPSHOTS_REQUEST.getDatabaseId()); + Assertions.assertEquals( + tableDto1.getTableId(), CREATE_TABLE_REQUEST_BODY_WITHIN_SNAPSHOTS_REQUEST.getTableId()); + Assertions.assertEquals( + tableDto1.getClusterId(), + CREATE_TABLE_REQUEST_BODY_WITHIN_SNAPSHOTS_REQUEST.getClusterId()); + Assertions.assertEquals( + tableDto1.getPolicies().getReplication().getConfig().get(0).getInterval(), "12H"); + } + @Test public void testToTableDtoWithStagedCreate() { TableDto tableDto = TableDto.builder().build(); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableModelConstants.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableModelConstants.java index 6cbe28eb..a9e7db5d 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableModelConstants.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/model/TableModelConstants.java @@ -9,6 +9,8 @@ import com.linkedin.openhouse.tables.api.spec.v0.request.IcebergSnapshotsRequestBody; import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.Replication; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.api.spec.v0.request.components.RetentionColumnPattern; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; @@ -20,6 +22,7 @@ import com.linkedin.openhouse.tables.dto.mapper.attribute.TimePartitionSpecConverter; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -37,8 +40,9 @@ public final class TableModelConstants { // this is evolved on top of HEALTH_SCHEMA_LITERAL. public static final String ADD_OPTIONAL_FIELD; - public static final RetentionColumnPattern COL_PAT; - public static final Retention RETENTION_POLICY; + public static RetentionColumnPattern COL_PAT; + public static Retention RETENTION_POLICY; + public static Replication REPLICATION_POLICY; public static final Retention RETENTION_POLICY_WITH_PATTERN; public static final Retention RETENTION_POLICY_WITH_EMPTY_PATTERN; @@ -58,6 +62,9 @@ public final class TableModelConstants { RETENTION_POLICY = Retention.builder().count(3).granularity(TimePartitionSpec.Granularity.HOUR).build(); + ArrayList configs = new ArrayList<>(); + configs.add(ReplicationConfig.builder().destination("cluster1").interval("12H").build()); + REPLICATION_POLICY = Replication.builder().config(configs).build(); RETENTION_POLICY_WITH_PATTERN = Retention.builder() .count(3) @@ -71,7 +78,8 @@ public final class TableModelConstants { .columnPattern(COL_PAT.toBuilder().pattern("").build()) .build(); - TABLE_POLICIES = Policies.builder().retention(RETENTION_POLICY).build(); + TABLE_POLICIES = + Policies.builder().retention(RETENTION_POLICY).replication(REPLICATION_POLICY).build(); TABLE_POLICIES_COMPLEX = Policies.builder().retention(RETENTION_POLICY_WITH_PATTERN).build(); SHARED_TABLE_POLICIES = Policies.builder().retention(RETENTION_POLICY).sharingEnabled(true).build();