Skip to content

Commit

Permalink
Change cluster naming to destination
Browse files Browse the repository at this point in the history
  • Loading branch information
chenselena committed Oct 24, 2024
1 parent e9fa600 commit e533299
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected boolean validateReplication(Policies policies, TableUri tableUri) {
}
}
if (replicationConfig.getDestination() != null) {
if (!validateReplicationCluster(replicationConfig, tableUri)) {
if (!validateReplicationDestination(replicationConfig, tableUri)) {
failureMessage =
String.format(
"Replication destination cluster for the table [%s] must be different from the source cluster",
Expand Down Expand Up @@ -147,7 +147,7 @@ protected boolean validateReplicationInterval(ReplicationConfig replicationConfi
/**
* Validate that the destination cluster provided by users is not the same as the source cluster
*/
protected boolean validateReplicationCluster(
protected boolean validateReplicationDestination(
ReplicationConfig replicationConfig, TableUri tableUri) {
return !replicationConfig.getDestination().toString().equals(tableUri.getClusterId());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.linkedin.openhouse.tables.common;

/**
* ENUM for default replication interval associated with Interval in {@link
* com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig}
*/
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;

/** ENUM for default replication interval associated with Interval in {@link ReplicationConfig} */
public enum ReplicationInterval {
// default interval to run replication jobs if no interval provided by user
DEFAULT("1D");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Policies mapPolicies(Policies policies) {
}
if (policies != null && policies.getReplication() != null) {
updatedPolicies =
policies
updatedPolicies
.toBuilder()
.replication(mapReplicationPolicies(policies.getReplication()))
.build();
Expand All @@ -112,8 +112,7 @@ public Policies mapPolicies(Policies policies) {
* @param replicationPolicy config for Openhouse table
* @return mapped policies object
*/
@Named("mapReplicationPolicies")
public Replication mapReplicationPolicies(Replication replicationPolicy) {
private Replication mapReplicationPolicies(Replication replicationPolicy) {
if (replicationPolicy != null && replicationPolicy.getConfig() != null) {
List<ReplicationConfig> replicationConfig =
replicationPolicy.getConfig().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,19 +260,19 @@ void testValidateReplicationConfig() {
ReplicationConfig replication1 =
ReplicationConfig.builder().destination("testClusterB").interval("12H").build();

Assertions.assertTrue(validator.validateReplicationCluster(replication1, tableUri));
Assertions.assertTrue(validator.validateReplicationDestination(replication1, tableUri));
Assertions.assertTrue(validator.validateReplicationInterval(replication1));

replication1 = ReplicationConfig.builder().destination("testCluster").build();
Assertions.assertTrue(validator.validateReplicationCluster(replication1, tableUri));
Assertions.assertTrue(validator.validateReplicationDestination(replication1, tableUri));

// Negative: destination cluster equal to source cluster
replication1 = ReplicationConfig.builder().destination(tableUri.getClusterId()).build();
Assertions.assertFalse(validator.validateReplicationCluster(replication1, tableUri));
Assertions.assertFalse(validator.validateReplicationDestination(replication1, tableUri));

replication1 =
ReplicationConfig.builder().destination(tableUri.getClusterId()).interval("12H").build();
Assertions.assertFalse(validator.validateReplicationCluster(replication1, tableUri));
Assertions.assertFalse(validator.validateReplicationDestination(replication1, tableUri));

// Negative: invalid interval input
replication1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,70 @@ public void testUpdateSucceedsForReplicationConfig() throws Exception {
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}

@Test
public void testUpdateSucceedsForReplicationAndRetention() throws Exception {
MvcResult mvcResult =
RequestAndValidateHelper.createTableAndValidateResponse(
GET_TABLE_RESPONSE_BODY
.toBuilder()
.timePartitioning(null)
.policies(TABLE_POLICIES_COMPLEX)
.build(),
mvc,
storageManager);

LinkedHashMap<String, LinkedHashMap> currentPolicies =
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

ReplicationConfig replicationConfig =
ReplicationConfig.builder().destination("clusterA").interval("").build();
Replication replication =
Replication.builder().config(Arrays.asList(replicationConfig)).build();
Retention retention =
Retention.builder()
.count(4)
.granularity(TimePartitionSpec.Granularity.HOUR)
.columnPattern(
RetentionColumnPattern.builder()
.pattern("yyyy-MM-dd")
.columnName("timestampCol")
.build())
.build();
Policies newPolicies = Policies.builder().replication(replication).retention(retention).build();

GetTableResponseBody container = GetTableResponseBody.builder().policies(newPolicies).build();
GetTableResponseBody addProp = buildGetTableResponseBody(mvcResult, container);
mvcResult =
mvc.perform(
MockMvcRequestBuilders.put(
String.format(
ValidationUtilities.CURRENT_MAJOR_VERSION_PREFIX
+ "/databases/%s/tables/%s",
addProp.getDatabaseId(),
addProp.getTableId()))
.contentType(MediaType.APPLICATION_JSON)
.content(buildCreateUpdateTableRequestBody(addProp).toJson())
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn();

LinkedHashMap<String, LinkedHashMap> updatedPolicies =
JsonPath.read(mvcResult.getResponse().getContentAsString(), "$.policies");

Assertions.assertNotEquals(currentPolicies, updatedPolicies);
Assertions.assertEquals(
updatedPolicies.get("replication").get("config").toString(),
"[{\"destination\":\"clusterA\",\"interval\":\"1D\"}]");
Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4);
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"),
"timestampCol");
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"),
"yyyy-MM-dd");
RequestAndValidateHelper.deleteTableAndValidateResponse(mvc, GET_TABLE_RESPONSE_BODY);
}

@Test
public void testUpdateSucceedsForMultipleReplicationConfig() throws Exception {
MvcResult mvcResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void validateCreateTableRequestParamWithInvalidDaysInPoliciesObject() {
}

@Test
public void validateCreateTableRequestParamWithInvalidReplicationClusterInPoliciesObject() {
public void validateCreateTableRequestParamWithInvalidReplicationDestinationInPoliciesObject() {
assertThrows(
RequestValidationFailureException.class,
() ->
Expand Down

0 comments on commit e533299

Please sign in to comment.