Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] add a new config that, when set, will update real time topic during partition count update #1403

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2356,4 +2356,6 @@ private ConfigKeys() {

public static final String SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP =
"server.delete.unassigned.partitions.on.startup";

public static final String UPDATE_REAL_TIME_TOPIC = "update.real.time.topic";
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,29 @@ private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, Stri
return Strings.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName;
}

public static String createNewRealTimeTopicName(String oldRealTimeTopicName) {
if (oldRealTimeTopicName == null || !oldRealTimeTopicName.endsWith(Version.REAL_TIME_TOPIC_SUFFIX)) {
throw new IllegalArgumentException("Invalid old name format");
}

// Extract the base name and current version
String base =
oldRealTimeTopicName.substring(0, oldRealTimeTopicName.length() - Version.REAL_TIME_TOPIC_SUFFIX.length());
String[] parts = base.split("_v");

String newBase;
if (parts.length == 2) {
// Increment the version
int version = Integer.parseInt(parts[1]) + 1;
newBase = parts[0] + "_v" + version;
} else {
// Start with version 2
newBase = base + "_v2";
}

return newBase + Version.REAL_TIME_TOPIC_SUFFIX;
}

private static class TimeUnitInfo {
String suffix;
int multiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,4 +393,47 @@ public void testGetLeaderTopicFromPubSubTopic() {
Utils.resolveLeaderTopicFromPubSubTopic(pubSubTopicRepository, separateRealTimeTopic),
realTimeTopic);
}

@Test
void testValidOldNameWithVersionIncrement() {
String oldName = "storeName_v1_rt";
String expectedNewName = "storeName_v2_rt";

String result = Utils.createNewRealTimeTopicName(oldName);

assertEquals(expectedNewName, result);
}

@Test
void testValidOldNameStartingNewVersion() {
String oldName = "storeName_rt";
String expectedNewName = "storeName_v2_rt";

String result = Utils.createNewRealTimeTopicName(oldName);

assertEquals(expectedNewName, result);
}

@Test
void testInvalidOldNameNull() {
assertThrows(IllegalArgumentException.class, () -> Utils.createNewRealTimeTopicName(null));
}

@Test
void testInvalidOldNameWithoutSuffix() {
String oldName = "storeName_v1";
assertThrows(IllegalArgumentException.class, () -> Utils.createNewRealTimeTopicName(oldName));
}

@Test
void testInvalidOldNameIncorrectFormat() {
String oldName = "storeName_v1_rt_extra";
assertThrows(IllegalArgumentException.class, () -> Utils.createNewRealTimeTopicName(oldName));
}

@Test
void testInvalidOldNameWithNonNumericVersion() {
String oldName = "storeName_vX_rt";
assertThrows(NumberFormatException.class, () -> Utils.createNewRealTimeTopicName(oldName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE;
import static com.linkedin.venice.ConfigKeys.UPDATE_REAL_TIME_TOPIC;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand All @@ -12,11 +13,14 @@
import com.linkedin.venice.controllerapi.MultiStoreResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import java.util.Arrays;
Expand All @@ -28,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -50,6 +55,7 @@ public void setUp() {
Properties parentControllerProperties = new Properties();
parentControllerProperties.setProperty(CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE, "false");
parentControllerProperties.setProperty(CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE, "false");
parentControllerProperties.setProperty(UPDATE_REAL_TIME_TOPIC, "true");

multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
NUMBER_OF_CHILD_DATACENTERS,
Expand Down Expand Up @@ -326,4 +332,59 @@ private void validateStoreMigrationStatus(
}
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testUpdateStoreRealTimeTopicName() throws Exception {
String storeName = Utils.getUniqueString("testUpdateStoreRealTimeTopicName");
List<VeniceControllerWrapper> parentControllers = multiRegionMultiClusterWrapper.getParentControllers();
String clusterName = clusterNames[0];
String parentControllerURLs =
parentControllers.stream().map(VeniceControllerWrapper::getControllerUrl).collect(Collectors.joining(","));
UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(2)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);

try (ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerURLs)) {
TestUtils.assertCommand(
parentControllerClient
.retryableRequest(5, c -> c.createNewStore(storeName, "test", "\"string\"", "\"string\"")));

TestUtils
.assertCommand(parentControllerClient.retryableRequest(5, c -> c.updateStore(storeName, updateStoreParams)));

multiRegionMultiClusterWrapper.getLeaderParentControllerWithRetries(clusterName)
.getVeniceAdmin()
.incrementVersionIdempotent(clusterName, storeName, Version.guidBasedDummyPushId(), 1, 1);

// Update update-real-time-topic to true
String[] adminToolArgs = new String[] { "--url", parentControllerClient.getLeaderControllerUrl(), "--cluster",
clusterName, "--store", storeName, "--update-store", "--partition-count", "2" };
AdminTool.main(adminToolArgs);
validateRealTimeTopic(parentControllerClient, storeName);
validateRealTimeTopicAcrossChildRegions(storeName, clusterName);
}
}

private void validateRealTimeTopic(ControllerClient controllerClient, String storeName) {
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
StoreInfo storeInfo = controllerClient.getStore(storeName).getStore();
String realTimeTopicNameInVersion = Utils.getRealTimeTopicName(storeInfo);
String expectedRealTimeTopicNameInStoreConfig = Utils.createNewRealTimeTopicName(realTimeTopicNameInVersion);
String actualRealTimeTopicNameInStoreConfig = storeInfo.getHybridStoreConfig().getRealTimeTopicName();
Assert.assertNotEquals(expectedRealTimeTopicNameInStoreConfig, realTimeTopicNameInVersion);
Assert.assertEquals(expectedRealTimeTopicNameInStoreConfig, actualRealTimeTopicNameInStoreConfig);
});
}

private void validateRealTimeTopicAcrossChildRegions(String storeName, String clusterName) {
for (VeniceMultiClusterWrapper childRegion: childDatacenters) {
try (ControllerClient childControllerClient =
new ControllerClient(clusterName, childRegion.getControllerConnectString())) {
validateRealTimeTopic(childControllerClient, storeName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_DELAY_FACTOR;
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS;
import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.UPDATE_REAL_TIME_TOPIC;
import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR;
import static com.linkedin.venice.ConfigKeys.USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH;
import static com.linkedin.venice.ConfigKeys.VENICE_STORAGE_CLUSTER_LEADER_HAAS;
Expand Down Expand Up @@ -534,6 +535,7 @@ public class VeniceControllerClusterConfig {
private final long serviceDiscoveryRegistrationRetryMS;

private Set<PushJobCheckpoints> pushJobUserErrorCheckpoints;
private boolean updateRealTimeTopic;

public VeniceControllerClusterConfig(VeniceProperties props) {
this.props = props;
Expand Down Expand Up @@ -978,6 +980,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.serviceDiscoveryRegistrationRetryMS =
props.getLong(SERVICE_DISCOVERY_REGISTRATION_RETRY_MS, 30L * Time.MS_PER_SECOND);
this.pushJobUserErrorCheckpoints = parsePushJobUserErrorCheckpoints(props);
this.updateRealTimeTopic = props.getBoolean(UPDATE_REAL_TIME_TOPIC, false);
}

public VeniceProperties getProps() {
Expand Down Expand Up @@ -1767,6 +1770,10 @@ public int getDanglingTopicOccurrenceThresholdForCleanup() {
return danglingTopicOccurrenceThresholdForCleanup;
}

public boolean getUpdateRealTimeTopic() {
return updateRealTimeTopic;
}

/**
* A function that would put a k/v pair into a map with some processing works.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,8 @@ public long getServiceDiscoveryRegistrationRetryMS() {
public List<String> getControllerInstanceTagList() {
return getCommonConfig().getControllerInstanceTagList();
}

public boolean getUpdateRealTimeTopic() {
return getCommonConfig().getUpdateRealTimeTopic();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4272,6 +4272,10 @@ public void setStorePartitionCount(String clusterName, String storeName, int par
// version.getPartitionCount()
// is read only in getRealTimeTopic and createInternalStore creation, so modifying currentVersion should not have
// any effect.
if (multiClusterConfigs.getUpdateRealTimeTopic() && store.isHybrid()
&& partitionCount != store.getPartitionCount()) {
updateRealTimeTopicName(store);
}
if (partitionCount != 0) {
store.setPartitionCount(partitionCount);
} else {
Expand All @@ -4285,7 +4289,16 @@ public void setStorePartitionCount(String clusterName, String storeName, int par
void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newPartitionCount) {
String errorMessagePrefix = "Store update error for " + store.getName() + " in cluster: " + clusterName + ": ";
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int maxPartitionNum = clusterConfig.getMaxNumberOfPartitions();

if (store.isHybrid() && store.getPartitionCount() != newPartitionCount) {
if (multiClusterConfigs.getUpdateRealTimeTopic() && newPartitionCount <= maxPartitionNum
&& newPartitionCount >= 0) {
LOGGER.info(
"Allow updating store " + store.getName() + " partition count to " + newPartitionCount
+ " because `updateRealTimeTopic` is true.");
return;
}
// Allow the update if partition count is not configured and the new partition count matches RT partition count
if (store.getPartitionCount() == 0) {
TopicManager topicManager;
Expand All @@ -4308,7 +4321,6 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP
throw new VeniceHttpException(HttpStatus.SC_BAD_REQUEST, errorMessage, ErrorType.INVALID_CONFIG);
}

int maxPartitionNum = clusterConfig.getMaxNumberOfPartitions();
if (newPartitionCount > maxPartitionNum) {
String errorMessage =
errorMessagePrefix + "Partition count: " + newPartitionCount + " should be less than max: " + maxPartitionNum;
Expand All @@ -4322,6 +4334,20 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP
}
}

private void updateRealTimeTopicName(Store store) {
String oldRealTimeTopicName = Utils.getRealTimeTopicName(store);
String newRealTimeTopicName;
PubSubTopic newRealTimeTopic;

do {
newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName);
newRealTimeTopic = getPubSubTopicRepository().getTopic(newRealTimeTopicName);
oldRealTimeTopicName = newRealTimeTopicName;
} while (getTopicManager().containsTopic(newRealTimeTopic));

store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName);
}

void setStorePartitionerConfig(String clusterName, String storeName, PartitionerConfig partitionerConfig) {
storeMetadataUpdate(clusterName, storeName, store -> {
// Only amplification factor is allowed to be changed if the store is a hybrid store.
Expand Down Expand Up @@ -4799,7 +4825,8 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
Optional<Long> hybridTimeLagThreshold = params.getHybridTimeLagThreshold();
Optional<DataReplicationPolicy> hybridDataReplicationPolicy = params.getHybridDataReplicationPolicy();
Optional<BufferReplayPolicy> hybridBufferReplayPolicy = params.getHybridBufferReplayPolicy();
Optional<String> realTimeTopicName = params.getRealTimeTopicName();
Optional<String> realTimeTopicName = Optional.empty(); // real time topic name should only be changed during
// partition count update
Optional<Boolean> accessControlled = params.getAccessControlled();
Optional<CompressionStrategy> compressionStrategy = params.getCompressionStrategy();
Optional<Boolean> clientDecompressionEnabled = params.getClientDecompressionEnabled();
Expand Down
Loading