Skip to content

Commit

Permalink
[controller] use new getRealTimeTopicName API - part-2 (#1386)
Browse files Browse the repository at this point in the history
chage api componseRealTimeTopic API reference
  • Loading branch information
arjun4084346 authored Dec 12, 2024
1 parent 7b03cd4 commit 26edffc
Show file tree
Hide file tree
Showing 22 changed files with 68 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public StoreIngestionTask(
this.versionTopic = pubSubTopicRepository.getTopic(kafkaVersionTopic);
this.storeName = versionTopic.getStoreName();
this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName);
this.realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
this.realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(version));
this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic);
this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY);
this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>();
Expand Down Expand Up @@ -3895,7 +3895,7 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
// cluster these metastore writes could be spiky
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName));
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5206,7 +5206,7 @@ public void testResolveTopicPartitionWithKafkaURL() {
doCallRealMethod().when(pcs).getSourceTopicPartition(any());
String store = "test_store";
String kafkaUrl = "localhost:1234";
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store));
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(store));
PubSubTopic separateRealTimeTopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store));
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(store, 1));
Assert.assertEquals(
Expand All @@ -5230,7 +5230,7 @@ public void testUnsubscribeFromTopic() {
PartitionConsumptionState pcs = mock(PartitionConsumptionState.class);
String store = "test_store";
String kafkaUrl = "localhost:1234";
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store));
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(store));
PubSubTopic separateRealTimeTopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store));
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(store, 1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand All @@ -24,7 +25,8 @@ public void testMetadataBasedTopicExistenceChecker() {

ReadOnlyStoreRepository repository = mock(ReadOnlyStoreRepository.class);
Store store = mock(Store.class);
doReturn(new VersionImpl("existingTopic", 123)).when(store).getVersion(123);
doReturn(new VersionImpl("existingTopic", 123, "existingTopic" + Version.REAL_TIME_TOPIC_SUFFIX)).when(store)
.getVersion(123);
doReturn(store).when(repository).getStoreOrThrow("existingTopic");
doThrow(new VeniceNoStoreException(nontExitingTopic1)).when(repository).getStoreOrThrow("non-existingTopic");
doReturn(true).when(store).isHybrid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down Expand Up @@ -56,7 +56,7 @@ public PushJobHeartbeatSender createHeartbeatSender(
StoreInfo storeInfo = heartBeatStoreResponse.getStore();
PartitionerConfig partitionerConfig = storeInfo.getPartitionerConfig();
int partitionNum = storeInfo.getPartitionCount();
String heartbeatKafkaTopicName = Version.composeRealTimeTopic(heartbeatStoreName);
String heartbeatKafkaTopicName = Utils.composeRealTimeTopic(heartbeatStoreName);
VeniceWriter<byte[], byte[], byte[]> veniceWriter = getVeniceWriter(
heartbeatKafkaTopicName,
partitionerConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void testHeartbeatSenderCreation() {
doReturn(partitionerConfig).when(storeInfo).getPartitionerConfig();
doReturn(storeInfo).when(storeResponse).getStore();
doReturn(storeResponse).when(controllerClient).getStore(heartbeatStoreName);
doReturn(heartbeatStoreName).when(storeInfo).getName();

// Value Schema prepare.
MultiSchemaResponse multiSchemaResponse = mock(MultiSchemaResponse.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.linkedin.venice.pushstatushelper;

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down Expand Up @@ -35,7 +35,7 @@ public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory, Schem

public VeniceWriter prepareVeniceWriter(String storeName) {
return veniceWriters.computeIfAbsent(storeName, s -> {
String rtTopic = Version.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName));
String rtTopic = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName));
VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic)
.setKeySerializer(
new VeniceAvroKafkaSerializer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
Expand All @@ -24,6 +23,7 @@
import com.linkedin.venice.systemstore.schemas.StoreReplicaStatus;
import com.linkedin.venice.systemstore.schemas.StoreValueSchema;
import com.linkedin.venice.systemstore.schemas.StoreValueSchemas;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceResourceCloseResult;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
Expand Down Expand Up @@ -415,7 +415,7 @@ Map<String, VeniceWriter> getMetaStoreWriterMap() {

VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) {
return metaStoreWriterMap.computeIfAbsent(metaStoreName, k -> {
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName));
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online");
}
Expand Down Expand Up @@ -460,7 +460,7 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter,
* to write a Control Message to the RT topic, and it could hang if the topic doesn't exist.
* This check is a best-effort since the race condition is still there between topic check and closing VeniceWriter.
*/
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName));
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
LOGGER.info(
"RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ public static PubSubTopic resolveLeaderTopicFromPubSubTopic(
PubSubTopic pubSubTopic) {
if (pubSubTopic.getPubSubTopicType().equals(PubSubTopicType.REALTIME_TOPIC)
&& pubSubTopic.getName().endsWith(SEPARATE_TOPIC_SUFFIX)) {
return pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(pubSubTopic.getStoreName()));
return pubSubTopicRepository.getTopic(composeRealTimeTopic(pubSubTopic.getStoreName()));
}
return pubSubTopic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public void testGetLeaderTopicFromPubSubTopic() {
PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
String store = "test_store";
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(store, 1));
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store));
PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(store));
PubSubTopic separateRealTimeTopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store));
Assert.assertEquals(Utils.resolveLeaderTopicFromPubSubTopic(pubSubTopicRepository, versionTopic), versionTopic);
Assert.assertEquals(Utils.resolveLeaderTopicFromPubSubTopic(pubSubTopicRepository, realTimeTopic), realTimeTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ public void testSetUp() {
topicName = Utils.getRealTimeTopicName(
cluster.getLeaderVeniceController().getVeniceAdmin().getStore(cluster.getClusterName(), store));
controllerClient.emptyPush(store, "test_push", 1);
TestUtils.assertCommand(controllerClient.emptyPush(this.store, "test_push", 1), "empty push failed");

TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> {
StoreResponse freshStoreResponse = controllerClient.getStore(store);
StoreResponse freshStoreResponse = controllerClient.getStore(this.store);
Assert.assertFalse(freshStoreResponse.isError());
Assert.assertEquals(
freshStoreResponse.getStore().getCurrentVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,7 @@ public void testNativeReplicationSourceFabric() {
public void testGetIncrementalPushVersion() {
String incrementalAndHybridEnabledStoreName = Utils.getUniqueString("testHybridStore");
veniceAdmin.createStore(clusterName, incrementalAndHybridEnabledStoreName, storeOwner, "\"string\"", "\"string\"");
veniceAdmin.getStore(clusterName, incrementalAndHybridEnabledStoreName);
veniceAdmin.updateStore(
clusterName,
incrementalAndHybridEnabledStoreName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void testIncrementalPushPartialUpdate() throws IOException {
// total key count.
Assert.assertTrue(offsetVector.get(3) >= 100);
});
PubSubTopic realTimeTopic = PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName));
PubSubTopic realTimeTopic = PUB_SUB_TOPIC_REPOSITORY.getTopic(Utils.composeRealTimeTopic(storeName));
PubSubTopic separateRealtimeTopic =
PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeSeparateRealTimeTopic(storeName));
PubSubTopic versionTopicV1 = PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeKafkaTopic(storeName, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.participant.protocol.ParticipantMessageKey;
import com.linkedin.venice.participant.protocol.ParticipantMessageValue;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
Expand Down Expand Up @@ -69,8 +68,8 @@ public VeniceWriter getWriter(String clusterName) {
return writeClients.computeIfAbsent(clusterName, k -> {
int attempts = 0;
boolean verified = false;
PubSubTopic topic = pubSubTopicRepository.getTopic(
Version.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName)));
PubSubTopic topic = pubSubTopicRepository
.getTopic(Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName)));
while (attempts < INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS) {
if (topicManagerRepository.getLocalTopicManager().containsTopicAndAllPartitionsAreOnline(topic)) {
verified = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.PushMonitorDelegator;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.utils.Utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -152,7 +153,7 @@ public static void deleteSystemStore(
default:
throw new VeniceException("Unknown system store type: " + systemStoreName);
}
admin.truncateKafkaTopic(Version.composeRealTimeTopic(systemStoreName));
admin.truncateKafkaTopic(Utils.composeRealTimeTopic(systemStoreName));
} else {
LOGGER.info("The RT topic for: {} will not be deleted since the user store is migrating", systemStoreName);
}
Expand Down
Loading

0 comments on commit 26edffc

Please sign in to comment.