Skip to content

Commit

Permalink
add updateRealTimeTopic arg in update store admin command
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Dec 17, 2024
1 parent bc71696 commit fb80bf4
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
genericParam(cmd, Arg.TARGET_SWAP_REGION, s -> s, p -> params.setTargetRegionSwap(p), argSet);
integerParam(cmd, Arg.TARGET_SWAP_REGION_WAIT_TIME, p -> params.setTargetRegionSwapWaitTime(p), argSet);
booleanParam(cmd, Arg.DAVINCI_HEARTBEAT_REPORTED, p -> params.setIsDavinciHeartbeatReported(p), argSet);
genericParam(cmd, Arg.REAL_TIME_TOPIC_NAME, s -> s, params::setRealTimeTopicName, argSet);
genericParam(cmd, Arg.SET_REAL_TIME_TOPIC, s -> s, params::setRealTimeTopicName, argSet);

/**
* {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public enum Arg {
DAVINCI_HEARTBEAT_REPORTED(
"dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats"
), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"),
REAL_TIME_TOPIC_NAME("real-time-topic-name", "rttn", true, "Create and set a new real time topic");
SET_REAL_TIME_TOPIC("SET-real-time-topic", "rttn", true, "Set a new real time topic");

private final String argName;
private final String first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import static com.linkedin.venice.Arg.SEPARATE_REALTIME_TOPIC_ENABLED;
import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.Arg.SERVER_URL;
import static com.linkedin.venice.Arg.SET_REAL_TIME_TOPIC;
import static com.linkedin.venice.Arg.SKIP_DIV;
import static com.linkedin.venice.Arg.SKIP_LAST_STORE_CREATION;
import static com.linkedin.venice.Arg.SOURCE_FABRIC;
Expand Down Expand Up @@ -279,7 +280,7 @@ public enum Command {
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED,
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION,
TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION }
TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION, SET_REAL_TIME_TOPIC }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,5 @@ public class ControllerApiConstants {
public static final String TARGET_SWAP_REGION = "target_swap_region";
public static final String TARGET_SWAP_REGION_WAIT_TIME = "target_swap_region_wait_time";
public static final String IS_DAVINCI_HEARTBEAT_REPORTED = "is_davinci_heartbeat_reported";
public static final String UPDATE_REAL_TIME_TOPIC = "update_real_time_topic";
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATED_CONFIGS_LIST;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.UPDATE_REAL_TIME_TOPIC;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.WRITE_COMPUTATION_ENABLED;

Expand Down Expand Up @@ -772,6 +773,14 @@ public Optional<Boolean> getIsDavinciHeartbeatReported() {
return getBoolean(IS_DAVINCI_HEARTBEAT_REPORTED);
}

public UpdateStoreQueryParams setUpdateRealTimeTopic(boolean updateRealTimeTopic) {
return putBoolean(UPDATE_REAL_TIME_TOPIC, updateRealTimeTopic);
}

public Optional<Boolean> getUpdateRealTimeTopic() {
return getBoolean(UPDATE_REAL_TIME_TOPIC);
}

// ***************** above this line are getters and setters *****************
private UpdateStoreQueryParams putInteger(String name, int value) {
return (UpdateStoreQueryParams) add(name, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,15 @@ void validateAndMaybeRetrySystemStoreAutoCreation(

void setStoreOwner(String clusterName, String storeName, String owner);

void setStorePartitionCount(String clusterName, String storeName, int partitionCount);
default void setStorePartitionCount(String clusterName, String storeName, int partitionCount) {
setStorePartitionCount(clusterName, storeName, partitionCount, Optional.of(false));
}

void setStorePartitionCount(
String clusterName,
String storeName,
int partitionCount,
Optional<Boolean> updateRealTimeTopic);

void setStoreReadability(String clusterName, String storeName, boolean desiredReadability);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4264,14 +4264,21 @@ public void setStoreOwner(String clusterName, String storeName, String owner) {
* would only change the number of partition for the following pushes. Current version would not be changed.
*/
@Override
public void setStorePartitionCount(String clusterName, String storeName, int partitionCount) {
public void setStorePartitionCount(
String clusterName,
String storeName,
int partitionCount,
Optional<Boolean> updateRealTimeTopic) {
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
storeMetadataUpdate(clusterName, storeName, store -> {
preCheckStorePartitionCountUpdate(clusterName, store, partitionCount);
preCheckStorePartitionCountUpdate(clusterName, store, partitionCount, updateRealTimeTopic);
// Do not update the partitionCount on the store.version as version config is immutable. The
// version.getPartitionCount()
// is read only in getRealTimeTopic and createInternalStore creation, so modifying currentVersion should not have
// any effect.
if (partitionCount != store.getPartitionCount()) {
updateRealTimeTopic(store, clusterName, partitionCount);
}
if (partitionCount != 0) {
store.setPartitionCount(partitionCount);
} else {
Expand All @@ -4282,10 +4289,23 @@ public void setStorePartitionCount(String clusterName, String storeName, int par
});
}

void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newPartitionCount) {
void preCheckStorePartitionCountUpdate(
String clusterName,
Store store,
int newPartitionCount,
Optional<Boolean> updateRealTimeTopic) {
String errorMessagePrefix = "Store update error for " + store.getName() + " in cluster: " + clusterName + ": ";
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int maxPartitionNum = clusterConfig.getMaxNumberOfPartitions();

if (store.isHybrid() && store.getPartitionCount() != newPartitionCount) {
if (updateRealTimeTopic.isPresent() && updateRealTimeTopic.get() && newPartitionCount <= maxPartitionNum
&& newPartitionCount >= 0) {
LOGGER.info(
"Allow updating store " + store.getName() + " partition count to " + newPartitionCount
+ " because `updateRealTimeTopic` is true.");
return;
}
// Allow the update if partition count is not configured and the new partition count matches RT partition count
if (store.getPartitionCount() == 0) {
TopicManager topicManager;
Expand All @@ -4308,7 +4328,6 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP
throw new VeniceHttpException(HttpStatus.SC_BAD_REQUEST, errorMessage, ErrorType.INVALID_CONFIG);
}

int maxPartitionNum = clusterConfig.getMaxNumberOfPartitions();
if (newPartitionCount > maxPartitionNum) {
String errorMessage =
errorMessagePrefix + "Partition count: " + newPartitionCount + " should be less than max: " + maxPartitionNum;
Expand All @@ -4322,6 +4341,41 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP
}
}

private void updateRealTimeTopic(Store store, String clusterName, int partitionCount) {
String storeName = store.getName();
String oldRealTimeTopicName = Utils.getRealTimeTopicName(store);
String newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName);
PubSubTopic newRealTimeTopic = getPubSubTopicRepository().getTopic(newRealTimeTopicName);

HelixVeniceClusterResources resources = getHelixVeniceClusterResources(storeName);
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
// The topic might be created by another thread already. Check before creating.
if (getTopicManager().containsTopic(newRealTimeTopic)) {
return;
}
ReadWriteStoreRepository repository = resources.getStoreMetadataRepository();
if (!store.isHybrid() && !store.isWriteComputationEnabled() && !store.isSystemStore()) {
logAndThrow("Store " + storeName + " is not hybrid, refusing to return a realtime topic");
}

VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
getTopicManager().createTopic(
newRealTimeTopic,
partitionCount,
clusterConfig.getKafkaReplicationFactorRTTopics(),
store.getRetentionTime(),
false,
clusterConfig.getMinInSyncReplicasRealTimeTopics(),
false);
LOGGER.warn(
"Creating real time topic per topic request for store: {}. "
+ "Buffer replay won't start for any existing versions",
storeName);
store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName);
repository.updateStore(store);
}
}

void setStorePartitionerConfig(String clusterName, String storeName, PartitionerConfig partitionerConfig) {
storeMetadataUpdate(clusterName, storeName, store -> {
// Only amplification factor is allowed to be changed if the store is a hybrid store.
Expand Down Expand Up @@ -4843,6 +4897,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
Optional<String> targetSwapRegion = params.getTargetSwapRegion();
Optional<Integer> targetSwapRegionWaitTime = params.getTargetRegionSwapWaitTime();
Optional<Boolean> isDavinciHeartbeatReported = params.getIsDavinciHeartbeatReported();
Optional<Boolean> updateRealTimeTopic = params.getUpdateRealTimeTopic();

final Optional<HybridStoreConfig> newHybridStoreConfig;
if (hybridRewindSeconds.isPresent() || hybridOffsetLagThreshold.isPresent() || hybridTimeLagThreshold.isPresent()
Expand Down Expand Up @@ -4874,7 +4929,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
}

if (partitionCount.isPresent()) {
setStorePartitionCount(clusterName, storeName, partitionCount.get());
setStorePartitionCount(clusterName, storeName, partitionCount.get(), updateRealTimeTopic);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2109,7 +2109,11 @@ public void setStoreOwner(String clusterName, String storeName, String owner) {
* admin message to the admin topic.
*/
@Override
public void setStorePartitionCount(String clusterName, String storeName, int partitionCount) {
public void setStorePartitionCount(
String clusterName,
String storeName,
int partitionCount,
Optional<Boolean> updateRealTimeTopic) {
acquireAdminMessageLock(clusterName, storeName);
try {
getVeniceHelixAdmin().checkPreConditionForUpdateStoreMetadata(clusterName, storeName);
Expand Down Expand Up @@ -2323,7 +2327,8 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
}

if (partitionCount.isPresent()) {
getVeniceHelixAdmin().preCheckStorePartitionCountUpdate(clusterName, currStore, partitionCount.get());
getVeniceHelixAdmin()
.preCheckStorePartitionCountUpdate(clusterName, currStore, partitionCount.get(), Optional.of(false));
setStore.partitionNum = partitionCount.get();
updatedConfigsList.add(PARTITION_COUNT);
} else {
Expand Down

0 comments on commit fb80bf4

Please sign in to comment.