Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Self serve replication API server side implementation #227

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,87 @@ public void testColumnPolicyTagsExistUpdateExistingPolicyTags() {
Assertions.assertTrue(updatedPolicies.getColumnTags().containsKey("col1"));
Assertions.assertEquals(tagHC, updatedPolicies.getColumnTags().get("col1").getTags());
}

@Test
public void testPoliciesReplicationExistsButNoUpdateEmptyInterval() {
Map<String, String> props = new HashMap<>();
props.put(
"policies", "{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "a");
Assertions.assertTrue(
updatedPolicies.getReplication().getConfig().get(0).getInterval().isEmpty());
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1);
}

@Test
public void testNoPoliciesReplicationButUpdateExists() {
Map<String, String> props = new HashMap<>();
props.put(
"updated.openhouse.policy",
"{\"replication\":{\"config\":[{\"destination\":\"aa\", \"interval\":\"1D\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertNotNull(updatedPolicies);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "aa");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getInterval(), "1D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1);
}

@Test
public void testPoliciesReplicationExistsUpdateExists() {
Map<String, String> props = new HashMap<>();
props.put(
"policies",
"{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}, {\"destination\":\"b\", \"interval\":\"1D\"}]}}");
props.put(
"updated.openhouse.policy",
"{\"replication\":{\"config\":[{\"destination\":\"aa\", \"interval\":\"2D\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "aa");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getInterval(), "2D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 1);
}

@Test
public void testPoliciesReplicationExistsUpdateExistsForMultiple() {
Map<String, String> props = new HashMap<>();
props.put(
"policies",
"{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}]}}");
props.put(
"updated.openhouse.policy",
"{\"replication\":{\"config\":[{\"destination\":\"a\", \"interval\":\"1D\"}, {\"destination\":\"aa\", \"interval\":\"2D\"}]}}");
TableMetadata metadata = mock(TableMetadata.class);
when(metadata.properties()).thenReturn(props);
OpenHouseTableOperations openHouseTableOperations = mock(OpenHouseTableOperations.class);
when(openHouseTableOperations.buildUpdatedPolicies(metadata)).thenCallRealMethod();
Policies updatedPolicies = openHouseTableOperations.buildUpdatedPolicies(metadata);
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getDestination(), "a");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(0).getInterval(), "1D");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(1).getDestination(), "aa");
Assertions.assertEquals(
updatedPolicies.getReplication().getConfig().get(1).getInterval(), "2D");
Assertions.assertEquals(updatedPolicies.getReplication().getConfig().size(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ protected CreateUpdateTableRequestBody constructMetadataRequestBody(
CreateUpdateTableRequestBody.TableTypeEnum.valueOf(
metadata.properties().get(OPENHOUSE_TABLE_TYPE_KEY)));
}

return createUpdateTableRequestBody;
}

Expand Down Expand Up @@ -212,6 +211,10 @@ Policies buildUpdatedPolicies(TableMetadata metadata) {
}
policies.setColumnTags(patchUpdatedPolicy.getColumnTags());
}
// Update replication config
if (patchUpdatedPolicy.getReplication() != null) {
policies.replication(patchUpdatedPolicy.getReplication());
}
return policies;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public class Policies {
example = "{'colName': [PII, HC]}")
@Valid
Map<String, PolicyTag> columnTags;

@Schema(
description =
"Replication as required in /tables API request. This field holds the replication spec config.",
example = "{replication:{config:[{destination: clusterA, interval: 12H}]}}")
@Valid
Replication replication;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.openhouse.tables.api.spec.v0.request.components;

import io.swagger.v3.oas.annotations.media.Schema;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder(toBuilder = true)
@EqualsAndHashCode
@Getter
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
chenselena marked this conversation as resolved.
Show resolved Hide resolved
public class Replication {
@Schema(
description =
"Replication config for the destination cluster name and replication job interval",
example = "[{destination: clusterA, interval: 12H}, {destination: clusterB, interval: 12H}]")
@NotNull(message = "Incorrect replication policy specified. Replication config cannot be null.")
@Valid
chenselena marked this conversation as resolved.
Show resolved Hide resolved
List<ReplicationConfig> config;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.linkedin.openhouse.tables.api.spec.v0.request.components;

import io.swagger.v3.oas.annotations.media.Schema;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;

@Builder(toBuilder = true)
@EqualsAndHashCode
@Getter
@AllArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ReplicationConfig {
@Schema(description = "Replication destination cluster name", example = "clusterA")
@NotNull(
message =
"Incorrect destination specified. Destination field for replication config cannot be null")
@Valid
String destination;
chenselena marked this conversation as resolved.
Show resolved Hide resolved

@Schema(
description =
"Optional parameter interval at which the replication job should run. Default value is 1D",
example = "1D")
@Valid
String interval;
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,16 @@ private List<String> validateUUIDForReplicaTable(
}

private void validatePolicies(CreateUpdateTableRequestBody createUpdateTableRequestBody) {
if (!policiesSpecValidator.validate(
createUpdateTableRequestBody.getPolicies(),
createUpdateTableRequestBody.getTimePartitioning(),
TableUri tableUri =
TableUri.builder()
.tableId(createUpdateTableRequestBody.getTableId())
.clusterId(createUpdateTableRequestBody.getClusterId())
.databaseId(createUpdateTableRequestBody.getDatabaseId())
.build(),
.build();
if (!policiesSpecValidator.validate(
createUpdateTableRequestBody.getPolicies(),
createUpdateTableRequestBody.getTimePartitioning(),
tableUri,
createUpdateTableRequestBody.getSchema())) {
throw new RequestValidationFailureException(
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.linkedin.openhouse.common.api.spec.TableUri;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import com.linkedin.openhouse.tables.common.DefaultColumnPattern;
Expand Down Expand Up @@ -86,9 +87,71 @@ public boolean validate(
return false;
}
}

return validateReplication(policies, tableUri);
}

/**
* Valid cases for replication object: 0. Interval input can be either be accepted as 12H or daily
* from 1-3D 1. Destination cluster cannot be equal to the source cluster
*/
protected boolean validateReplication(Policies policies, TableUri tableUri) {
if (policies != null
&& policies.getReplication() != null
&& policies.getReplication().getConfig() != null) {
return policies.getReplication().getConfig().stream()
.allMatch(
replicationConfig -> {
if (replicationConfig.getInterval() != null
&& !replicationConfig.getInterval().isEmpty()) {
if (!validateReplicationInterval(replicationConfig)) {
failureMessage =
String.format(
"Replication interval for the table [%s] can either be 12 hours or daily for up to 3 days",
tableUri);
return false;
}
}
if (replicationConfig.getDestination() != null) {
if (!validateReplicationDestination(replicationConfig, tableUri)) {
failureMessage =
String.format(
"Replication destination cluster for the table [%s] must be different from the source cluster",
rohitkum2506 marked this conversation as resolved.
Show resolved Hide resolved
tableUri);
return false;
}
}
return true;
});
}
return true;
}

/**
* Validate that the optional interval parameter provided by users exists as an interval of 12 or
* as a daily value up to 3 days
*/
protected boolean validateReplicationInterval(ReplicationConfig replicationConfig) {
String granularity =
replicationConfig.getInterval().substring(replicationConfig.getInterval().length() - 1);
int interval =
Integer.parseInt(
replicationConfig
.getInterval()
.substring(0, replicationConfig.getInterval().length() - 1));

return (interval >= 1 && interval <= 3 && granularity.equals("D"))
|| (interval == 12 && granularity.equals("H"));
}

/**
* Validate that the destination cluster provided by users is not the same as the source cluster
*/
protected boolean validateReplicationDestination(
ReplicationConfig replicationConfig, TableUri tableUri) {
return !replicationConfig.getDestination().toString().equals(tableUri.getClusterId());
}

/**
* Validate the pattern provided by users are legit pattern that complies with {@link
* DateTimeFormatter} symbols. Also, the provided column name needs to be part of schema.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.openhouse.tables.common;

import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;

/** ENUM for default replication interval associated with Interval in {@link ReplicationConfig} */
public enum ReplicationInterval {
// default interval to run replication jobs if no interval provided by user
DEFAULT("1D");

private final String interval;

ReplicationInterval(String interval) {
this.interval = interval;
}

public String getInterval() {
return interval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParseException;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Replication;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention;
import com.linkedin.openhouse.tables.common.DefaultColumnPattern;
import com.linkedin.openhouse.tables.common.ReplicationInterval;
import com.linkedin.openhouse.tables.model.TableDto;
import java.util.List;
import java.util.stream.Collectors;
import org.mapstruct.Mapper;
import org.mapstruct.Named;

Expand Down Expand Up @@ -61,6 +66,7 @@ public Policies toPoliciesObject(String policiesString) throws JsonParseExceptio
@Named("mapPolicies")
public Policies mapPolicies(Policies policies) {
String defaultPattern;
Policies updatedPolicies = policies;
if (policies != null
&& policies.getRetention() != null
&& policies.getRetention().getColumnPattern() != null
Expand All @@ -86,9 +92,44 @@ public Policies mapPolicies(Policies policies) {
.pattern(defaultPattern)
.build())
.build();
return policies.toBuilder().retention(retentionPolicy).build();
} else {
return policies;
updatedPolicies = policies.toBuilder().retention(retentionPolicy).build();
}
if (policies != null && policies.getReplication() != null) {
updatedPolicies =
updatedPolicies
.toBuilder()
.replication(mapReplicationPolicies(policies.getReplication()))
.build();
}
return updatedPolicies;
}

/**
* mapRetentionPolicies is a mapStruct function which assigns default interval value in
* replication config if the interval is empty. Default values for pattern are defined at {@link
* ReplicationInterval}.
*
* @param replicationPolicy config for Openhouse table
* @return mapped policies object
*/
private Replication mapReplicationPolicies(Replication replicationPolicy) {
if (replicationPolicy != null && replicationPolicy.getConfig() != null) {
List<ReplicationConfig> replicationConfig =
replicationPolicy.getConfig().stream()
.map(
replication -> {
if (replication.getInterval() == null || replication.getInterval().isEmpty()) {
return replication
.toBuilder()
.interval(ReplicationInterval.DEFAULT.getInterval())
.build();
}
return replication;
})
.collect(Collectors.toList());

return replicationPolicy.toBuilder().config(replicationConfig).build();
}
return replicationPolicy;
}
}
Loading
Loading