Skip to content

Commit

Permalink
add updateRealTimeTopic arg in update store admin command
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Dec 17, 2024
1 parent fb80bf4 commit ac0f396
Show file tree
Hide file tree
Showing 10 changed files with 1,188 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,6 @@ public static void main(String[] args) throws Exception {
case DUMP_HOST_HEARTBEAT:
dumpHostHeartbeat(cmd);
break;
case CREATE_REAL_TIME_TOPIC:
createRealTimeTopic(cmd);
break;
default:
StringJoiner availableCommands = new StringJoiner(", ");
for (Command c: Command.values()) {
Expand Down Expand Up @@ -1179,7 +1176,6 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
genericParam(cmd, Arg.TARGET_SWAP_REGION, s -> s, p -> params.setTargetRegionSwap(p), argSet);
integerParam(cmd, Arg.TARGET_SWAP_REGION_WAIT_TIME, p -> params.setTargetRegionSwapWaitTime(p), argSet);
booleanParam(cmd, Arg.DAVINCI_HEARTBEAT_REPORTED, p -> params.setIsDavinciHeartbeatReported(p), argSet);
genericParam(cmd, Arg.SET_REAL_TIME_TOPIC, s -> s, params::setRealTimeTopicName, argSet);

/**
* {@link Arg#REPLICATE_ALL_CONFIGS} doesn't require parameters; once specified, it means true.
Expand Down Expand Up @@ -3399,11 +3395,4 @@ private static PubSubConsumerAdapter getConsumer(
return pubSubClientsFactory.getConsumerAdapterFactory()
.create(new VeniceProperties(consumerProps), false, pubSubMessageDeserializer, "admin-tool-topic-dumper");
}

private static void createRealTimeTopic(CommandLine cmd) {
String storeName = getRequiredArgument(cmd, Arg.STORE, Command.CREATE_REAL_TIME_TOPIC);
String partitionNum = getOptionalArgument(cmd, Arg.PARTITION_COUNT);
PartitionResponse response = controllerClient.createRealTimeTopic(storeName, partitionNum);
printSuccess(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ public enum Arg {
),
DAVINCI_HEARTBEAT_REPORTED(
"dvc-heartbeat-reported", "dvchb", true, "Flag to indicate whether DVC is bootstrapping and sending heartbeats"
), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config"),
SET_REAL_TIME_TOPIC("SET-real-time-topic", "rttn", true, "Set a new real time topic");
), ENABLE_STORE_MIGRATION("enable-store-migration", "esm", true, "Toggle store migration store config");

private final String argName;
private final String first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
import static com.linkedin.venice.Arg.SEPARATE_REALTIME_TOPIC_ENABLED;
import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.Arg.SERVER_URL;
import static com.linkedin.venice.Arg.SET_REAL_TIME_TOPIC;
import static com.linkedin.venice.Arg.SKIP_DIV;
import static com.linkedin.venice.Arg.SKIP_LAST_STORE_CREATION;
import static com.linkedin.venice.Arg.SOURCE_FABRIC;
Expand Down Expand Up @@ -280,7 +279,7 @@ public enum Command {
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED,
NEARLINE_PRODUCER_COMPRESSION_ENABLED, NEARLINE_PRODUCER_COUNT_PER_WRITER, TARGET_SWAP_REGION,
TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION, SET_REAL_TIME_TOPIC }
TARGET_SWAP_REGION_WAIT_TIME, DAVINCI_HEARTBEAT_REPORTED, ENABLE_STORE_MIGRATION }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down Expand Up @@ -563,10 +562,6 @@ public enum Command {
"dump-host-heartbeat",
"Dump all heartbeat belong to a certain storage node. You can use topic/partition to filter specific resource, and you can choose to filter resources that are lagging.",
new Arg[] { SERVER_URL, KAFKA_TOPIC_NAME }, new Arg[] { PARTITION, LAG_FILTER_ENABLED }
),
CREATE_REAL_TIME_TOPIC(
"create-real-time-topic", "Create a real time topic for an existing store", new Arg[] { URL, STORE },
new Arg[] { CLUSTER, PARTITION_COUNT }
);

private final String commandName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,11 +936,6 @@ public StoppableNodeStatusResponse getAggregatedHealthStatus(
requestString.getBytes());
}

public PartitionResponse createRealTimeTopic(String storeName, String partitionNum) {
QueryParams params = newParams().add(NAME, storeName).add(PARTITION_COUNT, partitionNum);
return request(ControllerRoute.CREATE_REAL_TIME_TOPIC, params, PartitionResponse.class);
}

public MultiNodesStatusResponse listInstancesStatuses(boolean enableReplicas) {
QueryParams params = newParams();
if (enableReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ public enum ControllerRoute {
DELETE_UNUSED_VALUE_SCHEMAS(
"/delete_unused_value_schemas", HttpMethod.POST, Arrays.asList(CLUSTER, NAME),
ControllerApiConstants.VALUE_SCHEMA_IDS
), GET_INUSE_SCHEMA_IDS("/get_inuse_schema_ids", HttpMethod.GET, Arrays.asList(CLUSTER, NAME)),
CREATE_REAL_TIME_TOPIC("/create_real_time_topic", HttpMethod.POST, Arrays.asList(NAME, PARTITION_COUNT));
), GET_INUSE_SCHEMA_IDS("/get_inuse_schema_ids", HttpMethod.GET, Arrays.asList(CLUSTER, NAME));

private final String path;
private final HttpMethod httpMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.InstanceStatus;
import com.linkedin.venice.meta.Store;
Expand Down Expand Up @@ -1121,66 +1120,6 @@ public void testCleanupInstanceCustomizedStates() {
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testCreateRealTimeTopicCall() throws IOException, ExecutionException, InterruptedException {
String clusterName = venice.getClusterNames()[0];
String storeName = Utils.getUniqueString("testCreateRealTimeTopicCall");
CloseableHttpAsyncClient httpClient = HttpClientUtils.getMinimalHttpClient(1, 1, Optional.empty());
httpClient.start();

VeniceHelixAdmin childControllerAdmin = venice.getChildRegions().get(0).getRandomController().getVeniceHelixAdmin();
childControllerAdmin.createStore(clusterName, storeName, "test", "\"string\"", "\"string\"");

UpdateStoreQueryParams updateStoreParams = new UpdateStoreQueryParams();
updateStoreParams.setIncrementalPushEnabled(true)
.setBackupStrategy(BackupStrategy.KEEP_MIN_VERSIONS)
.setNumVersionsToPreserve(3)
.setHybridRewindSeconds(1000)
.setHybridOffsetLagThreshold(1000);

childControllerAdmin.updateStore(clusterName, storeName, updateStoreParams);
childControllerAdmin.incrementVersionIdempotent(clusterName, storeName, "1", 1, 1);

// API call with all fields
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair(ControllerApiConstants.CLUSTER, clusterName));
params.add(new BasicNameValuePair(ControllerApiConstants.NAME, storeName));
params.add(new BasicNameValuePair(ControllerApiConstants.PARTITION_COUNT, "8"));

makeRealTimeTopicCall(httpClient, params);

childControllerAdmin.incrementVersionIdempotent(clusterName, storeName, "2", 1, 1);
this.controllerClient
.sendEmptyPushAndWait(storeName, Utils.getUniqueString("empty-push"), 1L, 60L * Time.MS_PER_SECOND);

List<Version> versions = childControllerAdmin.getStore(clusterName, storeName).getVersions();
Assert.assertEquals(versions.size(), 2);

String oldRealTimeTopicName = Utils.getRealTimeTopicName(versions.get(0));
String newRealTimeTopicName = Utils.getRealTimeTopicName(versions.get(1));

Assert.assertNotEquals(oldRealTimeTopicName, newRealTimeTopicName);
Assert.assertTrue(
childControllerAdmin.getTopicManager()
.containsTopic(childControllerAdmin.getPubSubTopicRepository().getTopic(newRealTimeTopicName)));

httpClient.close();
}

private void makeRealTimeTopicCall(HttpAsyncClient httpClient, List<NameValuePair> params)
throws IOException, ExecutionException, InterruptedException {
// StringEntity entity = new StringEntity(OBJECT_MAPPER.writeValueAsString(payloads), ContentType.APPLICATION_JSON);

final HttpPost post = new HttpPost(
venice.getChildRegions().get(0).getControllerConnectString()
+ ControllerRoute.CREATE_REAL_TIME_TOPIC.getPath());
post.setEntity(new UrlEncodedFormEntity(params));
HttpResponse httpResponse = httpClient.execute(post, null).get();

Assert.assertEquals(httpResponse.getStatusLine().getStatusCode(), 200);
String responseString = IOUtils.toString(httpResponse.getEntity().getContent());
}

private void deleteStore(String storeName) {
parentControllerClient.enableStoreReadWrites(storeName, false);
parentControllerClient.deleteStore(storeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4277,7 +4277,7 @@ public void setStorePartitionCount(
// is read only in getRealTimeTopic and createInternalStore creation, so modifying currentVersion should not have
// any effect.
if (partitionCount != store.getPartitionCount()) {
updateRealTimeTopic(store, clusterName, partitionCount);
updateRealTimeTopicName(store);
}
if (partitionCount != 0) {
store.setPartitionCount(partitionCount);
Expand Down Expand Up @@ -4341,39 +4341,19 @@ void preCheckStorePartitionCountUpdate(
}
}

private void updateRealTimeTopic(Store store, String clusterName, int partitionCount) {
private void updateRealTimeTopicName(Store store) {
String storeName = store.getName();
String oldRealTimeTopicName = Utils.getRealTimeTopicName(store);
String newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName);
PubSubTopic newRealTimeTopic = getPubSubTopicRepository().getTopic(newRealTimeTopicName);
String newRealTimeTopicName;
PubSubTopic newRealTimeTopic;

HelixVeniceClusterResources resources = getHelixVeniceClusterResources(storeName);
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
// The topic might be created by another thread already. Check before creating.
if (getTopicManager().containsTopic(newRealTimeTopic)) {
return;
}
ReadWriteStoreRepository repository = resources.getStoreMetadataRepository();
if (!store.isHybrid() && !store.isWriteComputationEnabled() && !store.isSystemStore()) {
logAndThrow("Store " + storeName + " is not hybrid, refusing to return a realtime topic");
}
do {
newRealTimeTopicName = Utils.createNewRealTimeTopicName(oldRealTimeTopicName);
newRealTimeTopic = getPubSubTopicRepository().getTopic(newRealTimeTopicName);
oldRealTimeTopicName = newRealTimeTopicName;
} while (getTopicManager().containsTopic(newRealTimeTopic));

VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
getTopicManager().createTopic(
newRealTimeTopic,
partitionCount,
clusterConfig.getKafkaReplicationFactorRTTopics(),
store.getRetentionTime(),
false,
clusterConfig.getMinInSyncReplicasRealTimeTopics(),
false);
LOGGER.warn(
"Creating real time topic per topic request for store: {}. "
+ "Buffer replay won't start for any existing versions",
storeName);
store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName);
repository.updateStore(store);
}
store.getHybridStoreConfig().setRealTimeTopicName(newRealTimeTopicName);
}

void setStorePartitionerConfig(String clusterName, String storeName, PartitionerConfig partitionerConfig) {
Expand Down Expand Up @@ -4853,8 +4833,8 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto
Optional<Long> hybridTimeLagThreshold = params.getHybridTimeLagThreshold();
Optional<DataReplicationPolicy> hybridDataReplicationPolicy = params.getHybridDataReplicationPolicy();
Optional<BufferReplayPolicy> hybridBufferReplayPolicy = params.getHybridBufferReplayPolicy();
Optional<String> realTimeTopicName = Optional.empty(); // to update real time topic name, there is a separate admin
// tool command
Optional<String> realTimeTopicName = Optional.empty(); // real time topic name should only be changed during
// partition count update
Optional<Boolean> accessControlled = params.getAccessControlled();
Optional<CompressionStrategy> compressionStrategy = params.getCompressionStrategy();
Optional<Boolean> clientDecompressionEnabled = params.getClientDecompressionEnabled();
Expand Down Expand Up @@ -6503,7 +6483,7 @@ private void throwStoreAlreadyExists(String clusterName, String storeName) {
throw new VeniceStoreAlreadyExistsException(storeName, clusterName);
}

public static void throwStoreDoesNotExist(String clusterName, String storeName) {
private static void throwStoreDoesNotExist(String clusterName, String storeName) {
String errorMessage = "Store:" + storeName + " does not exist in cluster:" + clusterName;
LOGGER.error(errorMessage);
throw new VeniceNoStoreException(storeName, clusterName);
Expand All @@ -6518,7 +6498,7 @@ private void throwClusterNotInitialized(String clusterName) {
throw new VeniceNoClusterException(clusterName);
}

public static void logAndThrow(String msg) {
private void logAndThrow(String msg) {
LOGGER.info(msg);
throw new VeniceException(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPARE_STORE;
import static com.linkedin.venice.controllerapi.ControllerRoute.COMPLETE_MIGRATION;
import static com.linkedin.venice.controllerapi.ControllerRoute.CONFIGURE_ACTIVE_ACTIVE_REPLICATION_FOR_CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_REAL_TIME_TOPIC;
import static com.linkedin.venice.controllerapi.ControllerRoute.CREATE_STORAGE_PERSONA;
import static com.linkedin.venice.controllerapi.ControllerRoute.ClUSTER_HEALTH_INSTANCES;
import static com.linkedin.venice.controllerapi.ControllerRoute.DATA_RECOVERY;
Expand Down Expand Up @@ -651,10 +650,6 @@ public boolean startInner() throws Exception {
CLEANUP_INSTANCE_CUSTOMIZED_STATES.getPath(),
new VeniceParentControllerRegionStateHandler(admin, clusterRoutes.cleanupInstanceCustomizedStates(admin)));

httpService.post(
CREATE_REAL_TIME_TOPIC.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.createRealTimeTopic(admin)));

httpService.awaitInitialization(); // Wait for server to be initialized
Exception e = initFailure.get();
if (e != null) {
Expand Down
Loading

0 comments on commit ac0f396

Please sign in to comment.