Skip to content

Commit

Permalink
[controller] Harden update-store workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
nisargthakkar committed Dec 8, 2024
1 parent 7f491e8 commit 9117bc8
Show file tree
Hide file tree
Showing 64 changed files with 4,488 additions and 3,067 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ private StoreInfo getStoreInfo(Consumer<StoreInfo> info, boolean applyFirst) {
storeInfo.setChunkingEnabled(false);
storeInfo.setCompressionStrategy(CompressionStrategy.NO_OP);
storeInfo.setWriteComputationEnabled(false);
storeInfo.setIncrementalPushEnabled(false);
storeInfo.setNativeReplicationSourceFabric("dc-0");
Map<String, Integer> coloMaps = new HashMap<String, Integer>() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,16 @@ private ConfigKeys() {
"controller.store.graveyard.cleanup.sleep.interval.between.list.fetch.minutes";

/**
* Whether the superset schema generation in Parent Controller should be done via passed callback or not.
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
*/
public static final String CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.external.superset.schema.generation.enabled";

/**
* Whether the superset schema generation in Primary Controller should be done via passed callback or not.
* @deprecated Use {@link #CONTROLLER_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED}
*/
@Deprecated
public static final String CONTROLLER_PARENT_EXTERNAL_SUPERSET_SCHEMA_GENERATION_ENABLED =
"controller.parent.external.superset.schema.generation.enabled";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ private void deleteStores(List<String> storeNames) {

public StoragePersona getPersonaContainingStore(String storeName) {
String personaName = storeNamePersonaMap.get(storeName);
if (personaName == null)
if (personaName == null) {
return null;
}
return getPersona(personaName);
}

private boolean isStoreSetValid(StoragePersona persona, Optional<Store> additionalStore) {
Set<String> setToValidate = new HashSet<>();
if (additionalStore.isPresent())
setToValidate.add(additionalStore.get().getName());
additionalStore.ifPresent(store -> setToValidate.add(store.getName()));
setToValidate.addAll(persona.getStoresToEnforce());
return setToValidate.stream()
.allMatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum BackupStrategy {
// KEEP_IN_KAFKA_ONLY,
/** Keep in user-specified store eg HDD, other DB */
// KEEP_IN_USER_STORE;
private int value;
private final int value;

BackupStrategy(int v) {
this.value = v;
Expand All @@ -35,6 +35,10 @@ public enum BackupStrategy {
Arrays.stream(values()).forEach(s -> idMapping.put(s.value, s));
}

public int getValue() {
return value;
}

public static BackupStrategy fromInt(int i) {
BackupStrategy strategy = idMapping.get(i);
if (strategy == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public interface HybridStoreConfig extends DataModelBackedStructure<StoreHybridC
void setRealTimeTopicName(String realTimeTopicName);

HybridStoreConfig clone();

default boolean isHybrid() {
return getRewindTimeInSeconds() >= 0
&& (getOffsetLagThresholdToGoOnline() >= 0 || getProducerTimestampLagThresholdToGoOnlineInSeconds() >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public int hashCode() {

@JsonIgnore
public PartitionerConfig clone() {
return new PartitionerConfigImpl(getPartitionerClass(), getPartitionerParams(), getAmplificationFactor());
return new PartitionerConfigImpl(
getPartitionerClass(),
new HashMap<>(getPartitionerParams()),
getAmplificationFactor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public ZKStore(Store store) {
setSchemaAutoRegisterFromPushJobEnabled(store.isSchemaAutoRegisterFromPushJobEnabled());
setLatestSuperSetValueSchemaId(store.getLatestSuperSetValueSchemaId());
setHybridStoreDiskQuotaEnabled(store.isHybridStoreDiskQuotaEnabled());
setEtlStoreConfig(store.getEtlStoreConfig());
setEtlStoreConfig(store.getEtlStoreConfig().clone());
setStoreMetadataSystemStoreEnabled(store.isStoreMetadataSystemStoreEnabled());
setLatestVersionPromoteToCurrentTimestamp(store.getLatestVersionPromoteToCurrentTimestamp());
setBackupVersionRetentionMs(store.getBackupVersionRetentionMs());
Expand All @@ -220,7 +220,7 @@ public ZKStore(Store store) {
setStoreMetaSystemStoreEnabled(store.isStoreMetaSystemStoreEnabled());
setActiveActiveReplicationEnabled(store.isActiveActiveReplicationEnabled());
setRmdVersion(store.getRmdVersion());
setViewConfigs(store.getViewConfigs());
setViewConfigs(new HashMap<>(store.getViewConfigs()));
setStorageNodeReadQuotaEnabled(store.isStorageNodeReadQuotaEnabled());
setUnusedSchemaDeletionEnabled(store.isUnusedSchemaDeletionEnabled());
setMinCompactionLagSeconds(store.getMinCompactionLagSeconds());
Expand Down Expand Up @@ -368,11 +368,7 @@ public void setLargestUsedVersionNumber(int largestUsedVersionNumber) {
@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
@Override
public long getStorageQuotaInByte() {
// This is a safeguard in case that some old stores do not have storage quota field
return (this.storeProperties.storageQuotaInByte <= 0
&& this.storeProperties.storageQuotaInByte != UNLIMITED_STORAGE_QUOTA)
? DEFAULT_STORAGE_QUOTA
: this.storeProperties.storageQuotaInByte;
return this.storeProperties.storageQuotaInByte;
}

@SuppressWarnings("unused") // Used by Serializer/De-serializer for storing to Zoo Keeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,16 @@ public static int calculatePartitionCount(
} else if (partitionCount < minPartitionCount) {
partitionCount = minPartitionCount;
}

int returnPartitionCount = partitionCount <= 0 ? 1 : (int) partitionCount;

LOGGER.info(
"Assign partition count: {} calculated by storage quota: {} to the new version of store: {}",
partitionCount,
returnPartitionCount,
storageQuota,
storeName);
return (int) partitionCount;

return returnPartitionCount;
}

public static VenicePartitioner getVenicePartitioner(PartitionerConfig config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.linkedin.venice.views;

import com.linkedin.avroutil1.compatibility.shaded.org.apache.commons.lang3.tuple.Pair;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewParameterKeys;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -43,9 +45,6 @@ public void validateConfigs() {
throw new VeniceException(
String.format(MISSING_PARAMETER_MESSAGE, ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()));
}
if (store.getViewConfigs().containsKey(viewName)) {
throw new VeniceException("A view config with the same view name already exist, view name: " + viewName);
}
String viewPartitioner = viewParameters.get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name());
if (viewPartitioner == null) {
throw new VeniceException(
Expand All @@ -68,18 +67,21 @@ public void validateConfigs() {
throw new VeniceException(
"A materialized view with the same partitioner and partition count as the original store is not allowed!");
}
// Check if there is already a materialized view with identical configs
// Check if multiple materialized views share identical configs
Map<Pair<String, Integer>, String> partitionerAndCounts = new HashMap<>(store.getViewConfigs().size());
for (Map.Entry<String, ViewConfig> viewConfigEntries: store.getViewConfigs().entrySet()) {
ViewConfig viewConfig = viewConfigEntries.getValue();
if (viewConfig.getViewClassName().equals(MaterializedView.class.getCanonicalName())) {
String configPartitioner =
viewConfig.getViewParameters().get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITIONER.name());
int configPartitionCount = Integer
.parseInt(viewConfig.getViewParameters().get(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name()));
if (configPartitionCount == viewPartitionCount && configPartitioner.equals(viewPartitioner)) {
Pair<String, Integer> partitionerAndCount = Pair.of(configPartitioner, configPartitionCount);
if ((viewName = partitionerAndCounts.get(partitionerAndCount)) != null) {
throw new VeniceException(
"A view with identical view configs already exist, view name: " + viewConfigEntries.getKey());
String.format("Views '%s' and '%s' have identical view configs", viewName, viewConfigEntries.getKey()));
}
partitionerAndCounts.put(partitionerAndCount, viewConfigEntries.getKey());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.venice.meta;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;

import com.linkedin.venice.exceptions.VeniceException;
import org.testng.annotations.Test;


public class BackupStrategyTest {
@Test
public void testFromInt() {
assertEquals(BackupStrategy.fromInt(0), BackupStrategy.KEEP_MIN_VERSIONS);
assertEquals(BackupStrategy.fromInt(1), BackupStrategy.DELETE_ON_NEW_PUSH_START);
assertThrows(VeniceException.class, () -> BackupStrategy.fromInt(2));
}

@Test
public void testGetValue() {
assertEquals(BackupStrategy.KEEP_MIN_VERSIONS.getValue(), 0);
assertEquals(BackupStrategy.DELETE_ON_NEW_PUSH_START.getValue(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,26 @@ public void deserializes() throws IOException {
Assert.assertEquals(fasterXml.getRewindTimeInSeconds(), 123L);
Assert.assertEquals(fasterXml.getDataReplicationPolicy(), DataReplicationPolicy.NON_AGGREGATE);
}

@Test
public void testIsHybrid() {
HybridStoreConfig hybridStoreConfig;
hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, -1, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, -1, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, 100, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(100, -1, 100, null, null);
Assert.assertTrue(hybridStoreConfig.isHybrid());

hybridStoreConfig = new HybridStoreConfigImpl(-1, -1, 100, null, null);
Assert.assertFalse(hybridStoreConfig.isHybrid());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameterKeys;
import com.linkedin.venice.partitioner.ConstantVenicePartitioner;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
Expand Down Expand Up @@ -64,7 +65,15 @@ public void testValidateConfigs() {
existingViewConfigParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(12));
doReturn(existingViewConfigParams).when(viewConfig).getViewParameters();
doReturn(MaterializedView.class.getCanonicalName()).when(viewConfig).getViewClassName();
doReturn(Collections.singletonMap("old-view", viewConfig)).when(storeWithExistingViews).getViewConfigs();
Map<String, ViewConfig> viewConfigMap = new HashMap() {
{
put("old-view", viewConfig);
put(
viewParams.get(ViewParameterKeys.MATERIALIZED_VIEW_NAME.name()),
new ViewConfigImpl(MaterializedView.class.getCanonicalName(), viewParams));
}
};
doReturn(viewConfigMap).when(storeWithExistingViews).getViewConfigs();
// Fail due to existing identical view config
assertThrows(() -> new MaterializedView(properties, storeWithExistingViews, viewParams).validateConfigs());
existingViewConfigParams.put(ViewParameterKeys.MATERIALIZED_VIEW_PARTITION_COUNT.name(), Integer.toString(36));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,40 @@ public void testClusterLevelActiveActiveReplicationConfigForNewHybridStores() th
assertFalse(parentControllerClient.getStore(storeName).getStore().isActiveActiveReplicationEnabled());
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testClusterLevelActiveActiveReplicationConfigForNewIncrementalPushStores() throws IOException {
String storeName = Utils.getUniqueString("test-store-incremental");
String pushJobId1 = "test-push-job-id-1";
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.emptyPush(storeName, pushJobId1, 1);

// Version 1 should exist.
StoreInfo store = assertCommand(parentControllerClient.getStore(storeName)).getStore();
assertEquals(store.getVersions().size(), 1);

// Check store level Active/Active is enabled or not
assertFalse(store.isActiveActiveReplicationEnabled());
assertFalse(store.isIncrementalPushEnabled());
assertFalse(store.isActiveActiveReplicationEnabled());

// Convert to incremental push store
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(true)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertTrue(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});

// After inc push is disabled, even default A/A config for pure hybrid store is false,
// original store A/A config is enabled.
assertCommand(
parentControllerClient.updateStore(storeName, new UpdateStoreQueryParams().setIncrementalPushEnabled(false)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
StoreInfo storeToTest = parentControllerClient.getStore(storeName).getStore();
assertFalse(storeToTest.isIncrementalPushEnabled());
assertTrue(storeToTest.isActiveActiveReplicationEnabled());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.TestUtils;
Expand Down Expand Up @@ -97,6 +98,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
parentControllerClient.updateStore(
storeName,
new UpdateStoreQueryParams().setIncrementalPushEnabled(true)
.setHybridDataReplicationPolicy(DataReplicationPolicy.NONE)
.setHybridRewindSeconds(1L)
.setHybridOffsetLagThreshold(10)));
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
Expand All @@ -107,7 +109,7 @@ public void testClusterLevelNativeReplicationConfigForNewStores() {
}

@Test(timeOut = TEST_TIMEOUT)
public void testConvertHybridDuringPushjob() {
public void testConvertHybridDuringPushJob() {
String storeName = Utils.getUniqueString("test-store");
parentControllerClient.createNewStore(storeName, "test-owner", "\"string\"", "\"string\"");
parentControllerClient.requestTopicForWrites(
Expand All @@ -128,7 +130,7 @@ public void testConvertHybridDuringPushjob() {
storeName,
new UpdateStoreQueryParams().setHybridRewindSeconds(1L).setHybridOffsetLagThreshold(1L));
Assert.assertTrue(response.isError());
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a pushjob running"));
Assert.assertTrue(response.getError().contains("Cannot convert to hybrid as there is already a push job running"));
parentControllerClient.killOfflinePushJob(Version.composeKafkaTopic(storeName, 1));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,10 @@ public void testEnableActiveActiveReplicationSchema() {
Assert.assertFalse(schemaResponse2.isError(), "addValeSchema returned error: " + schemaResponse2.getError());

// Enable AA on store
UpdateStoreQueryParams updateStoreToEnableAARepl =
new UpdateStoreQueryParams().setNativeReplicationEnabled(true).setActiveActiveReplicationEnabled(true);
UpdateStoreQueryParams updateStoreToEnableAARepl = new UpdateStoreQueryParams().setNativeReplicationEnabled(true)
.setActiveActiveReplicationEnabled(true)
.setHybridOffsetLagThreshold(1000)
.setHybridRewindSeconds(1000);
TestWriteUtils.updateStore(storeName, parentControllerClient, updateStoreToEnableAARepl);
/**
* Test Active/Active replication config enablement generates the active active metadata schema.
Expand Down
Loading

0 comments on commit 9117bc8

Please sign in to comment.