From e458d652e68018f74a530237b7f01fc66bca1215 Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Fri, 1 Nov 2024 11:06:48 -0700 Subject: [PATCH] add rt topic name in store config fix serialization fix tests fix documentation move methods from Version to Utils removing version changes add tests fix bug in SIT --- .../kafka/consumer/StoreIngestionTask.java | 4 +- .../consumer/StoreIngestionTaskTest.java | 4 +- .../consumer/TopicExistenceCheckerTest.java | 4 +- .../DefaultPushJobHeartbeatSenderFactory.java | 4 +- .../heartbeat/TestPushJobHeartbeatSender.java | 1 + .../PushStatusStoreVeniceWriterCache.java | 4 +- .../venice/system/store/MetaStoreWriter.java | 6 +-- .../java/com/linkedin/venice/utils/Utils.java | 2 +- .../com/linkedin/venice/utils/UtilsTest.java | 2 +- .../consumer/ConsumerIntegrationTest.java | 4 +- ...VeniceHelixAdminWithSharedEnvironment.java | 1 + .../TestSeparateRealtimeTopicIngestion.java | 2 +- .../ParticipantStoreClientsManager.java | 5 +-- .../UserSystemStoreLifeCycleHelper.java | 3 +- .../venice/controller/VeniceHelixAdmin.java | 40 ++++++------------- .../controller/VeniceParentHelixAdmin.java | 10 +---- .../controller/server/CreateVersion.java | 6 +-- .../controller/server/StoresRoutes.java | 14 ++++++- .../control/RealTimeTopicSwitcher.java | 3 +- .../pushmonitor/AbstractPushMonitor.java | 3 +- .../controller/TestVeniceHelixAdmin.java | 8 +++- .../venice/router/MetaDataHandler.java | 3 +- 22 files changed, 68 insertions(+), 65 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 62e82335fe2..d376ed85281 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -381,7 +381,7 @@ public StoreIngestionTask( this.versionTopic = pubSubTopicRepository.getTopic(kafkaVersionTopic); this.storeName = versionTopic.getStoreName(); this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName); - this.realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + this.realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(version)); this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic); this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY); this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>(); @@ -3884,7 +3884,7 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep // cluster these metastore writes could be spiky if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) { String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName); - PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName)); + PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) { metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index fe85fbf9388..adff67085a2 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -5191,7 +5191,7 @@ public void testResolveTopicPartitionWithKafkaURL() { doCallRealMethod().when(pcs).getSourceTopicPartition(any()); String store = "test_store"; String kafkaUrl = "localhost:1234"; - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store)); + PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(store)); PubSubTopic separateRealTimeTopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store)); PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(store, 1)); Assert.assertEquals( @@ -5215,7 +5215,7 @@ public void testUnsubscribeFromTopic() { PartitionConsumptionState pcs = mock(PartitionConsumptionState.class); String store = "test_store"; String kafkaUrl = "localhost:1234"; - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store)); + PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(store)); PubSubTopic separateRealTimeTopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store)); PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(store, 1)); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/TopicExistenceCheckerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/TopicExistenceCheckerTest.java index 9d75bda11a3..3de3e8ed1c2 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/TopicExistenceCheckerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/TopicExistenceCheckerTest.java @@ -7,6 +7,7 @@ import com.linkedin.venice.exceptions.VeniceNoStoreException; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import org.testng.Assert; import org.testng.annotations.Test; @@ -24,7 +25,8 @@ public void testMetadataBasedTopicExistenceChecker() { ReadOnlyStoreRepository repository = mock(ReadOnlyStoreRepository.class); Store store = mock(Store.class); - doReturn(new VersionImpl("existingTopic", 123)).when(store).getVersion(123); + doReturn(new VersionImpl("existingTopic", 123, "existingTopic" + Version.REAL_TIME_TOPIC_SUFFIX)).when(store) + .getVersion(123); doReturn(store).when(repository).getStoreOrThrow("existingTopic"); doThrow(new VeniceNoStoreException(nontExitingTopic1)).when(repository).getStoreOrThrow("non-existingTopic"); doReturn(true).when(store).isHybrid(); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java index 87a42be4135..dd2b3107140 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/heartbeat/DefaultPushJobHeartbeatSenderFactory.java @@ -13,10 +13,10 @@ import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.PartitionerConfig; import com.linkedin.venice.meta.StoreInfo; -import com.linkedin.venice.meta.Version; import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.PartitionUtils; +import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -56,7 +56,7 @@ public PushJobHeartbeatSender createHeartbeatSender( StoreInfo storeInfo = heartBeatStoreResponse.getStore(); PartitionerConfig partitionerConfig = storeInfo.getPartitionerConfig(); int partitionNum = storeInfo.getPartitionCount(); - String heartbeatKafkaTopicName = Version.composeRealTimeTopic(heartbeatStoreName); + String heartbeatKafkaTopicName = Utils.getRealTimeTopicName(storeInfo); VeniceWriter veniceWriter = getVeniceWriter( heartbeatKafkaTopicName, partitionerConfig, diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/heartbeat/TestPushJobHeartbeatSender.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/heartbeat/TestPushJobHeartbeatSender.java index 72e5cdfe171..1b098e0217e 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/heartbeat/TestPushJobHeartbeatSender.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/heartbeat/TestPushJobHeartbeatSender.java @@ -40,6 +40,7 @@ public void testHeartbeatSenderCreation() { doReturn(partitionerConfig).when(storeInfo).getPartitionerConfig(); doReturn(storeInfo).when(storeResponse).getStore(); doReturn(storeResponse).when(controllerClient).getStore(heartbeatStoreName); + doReturn(heartbeatStoreName).when(storeInfo).getName(); // Value Schema prepare. MultiSchemaResponse multiSchemaResponse = mock(MultiSchemaResponse.class); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java index c3ea4ccb6c1..a82ed760e64 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java @@ -1,9 +1,9 @@ package com.linkedin.venice.pushstatushelper; import com.linkedin.venice.common.VeniceSystemStoreUtils; -import com.linkedin.venice.meta.Version; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -35,7 +35,7 @@ public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory, Schem public VeniceWriter prepareVeniceWriter(String storeName) { return veniceWriters.computeIfAbsent(storeName, s -> { - String rtTopic = Version.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); + String rtTopic = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic) .setKeySerializer( new VeniceAvroKafkaSerializer( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java index 8c4ff1c4479..a77e2d3767b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java @@ -7,7 +7,6 @@ import com.linkedin.venice.meta.Instance; import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.StoreConfig; -import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.ZKStore; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; @@ -24,6 +23,7 @@ import com.linkedin.venice.systemstore.schemas.StoreReplicaStatus; import com.linkedin.venice.systemstore.schemas.StoreValueSchema; import com.linkedin.venice.systemstore.schemas.StoreValueSchemas; +import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceResourceCloseResult; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.writer.VeniceWriter; @@ -415,7 +415,7 @@ Map getMetaStoreWriterMap() { VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) { return metaStoreWriterMap.computeIfAbsent(metaStoreName, k -> { - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online"); } @@ -460,7 +460,7 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter, * to write a Control Message to the RT topic, and it could hang if the topic doesn't exist. * This check is a best-effort since the race condition is still there between topic check and closing VeniceWriter. */ - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName)); if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) { LOGGER.info( "RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages", diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java index e52dfbb3c28..72b8fe5e119 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java @@ -1057,7 +1057,7 @@ public static PubSubTopic resolveLeaderTopicFromPubSubTopic( PubSubTopic pubSubTopic) { if (pubSubTopic.getPubSubTopicType().equals(PubSubTopicType.REALTIME_TOPIC) && pubSubTopic.getName().endsWith(SEPARATE_TOPIC_SUFFIX)) { - return pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(pubSubTopic.getStoreName())); + return pubSubTopicRepository.getTopic(composeRealTimeTopic(pubSubTopic.getStoreName())); } return pubSubTopic; } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java index 9f7e03705f5..ca0abdd4aa0 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java @@ -385,7 +385,7 @@ public void testGetLeaderTopicFromPubSubTopic() { PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); String store = "test_store"; PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(store, 1)); - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store)); + PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(store)); PubSubTopic separateRealTimeTopic = pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store)); Assert.assertEquals(Utils.resolveLeaderTopicFromPubSubTopic(pubSubTopicRepository, versionTopic), versionTopic); Assert.assertEquals(Utils.resolveLeaderTopicFromPubSubTopic(pubSubTopicRepository, realTimeTopic), realTimeTopic); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java index fe486b1b975..ac5c0002893 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java @@ -127,8 +127,10 @@ public void testSetUp() { topicName = Utils.getRealTimeTopicName( cluster.getLeaderVeniceController().getVeniceAdmin().getStore(cluster.getClusterName(), store)); controllerClient.emptyPush(store, "test_push", 1); + TestUtils.assertCommand(controllerClient.emptyPush(this.store, "test_push", 1), "empty push failed"); + TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> { - StoreResponse freshStoreResponse = controllerClient.getStore(store); + StoreResponse freshStoreResponse = controllerClient.getStore(this.store); Assert.assertFalse(freshStoreResponse.isError()); Assert.assertEquals( freshStoreResponse.getStore().getCurrentVersion(), diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java index 221757d71d9..cfb1e8cb9e6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithSharedEnvironment.java @@ -1672,6 +1672,7 @@ public void testNativeReplicationSourceFabric() { public void testGetIncrementalPushVersion() { String incrementalAndHybridEnabledStoreName = Utils.getUniqueString("testHybridStore"); veniceAdmin.createStore(clusterName, incrementalAndHybridEnabledStoreName, storeOwner, "\"string\"", "\"string\""); + veniceAdmin.getStore(clusterName, incrementalAndHybridEnabledStoreName); veniceAdmin.updateStore( clusterName, incrementalAndHybridEnabledStoreName, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java index 2f651a11775..9656e6ebffb 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSeparateRealtimeTopicIngestion.java @@ -209,7 +209,7 @@ public void testIncrementalPushPartialUpdate() throws IOException { // total key count. Assert.assertTrue(offsetVector.get(3) >= 100); }); - PubSubTopic realTimeTopic = PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic realTimeTopic = PUB_SUB_TOPIC_REPOSITORY.getTopic(Utils.composeRealTimeTopic(storeName)); PubSubTopic separateRealtimeTopic = PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeSeparateRealTimeTopic(storeName)); PubSubTopic versionTopicV1 = PUB_SUB_TOPIC_REPOSITORY.getTopic(Version.composeKafkaTopic(storeName, 1)); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java index f4c9bbb51bc..ffa900d3059 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/ParticipantStoreClientsManager.java @@ -9,7 +9,6 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.exceptions.VeniceException; -import com.linkedin.venice.meta.Version; import com.linkedin.venice.participant.protocol.ParticipantMessageKey; import com.linkedin.venice.participant.protocol.ParticipantMessageValue; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -69,8 +68,8 @@ public VeniceWriter getWriter(String clusterName) { return writeClients.computeIfAbsent(clusterName, k -> { int attempts = 0; boolean verified = false; - PubSubTopic topic = pubSubTopicRepository.getTopic( - Version.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); + PubSubTopic topic = pubSubTopicRepository + .getTopic(Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); while (attempts < INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS) { if (topicManagerRepository.getLocalTopicManager().containsTopicAndAllPartitionsAreOnline(topic)) { verified = true; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java index 90968b87538..9eb30b62aa7 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java @@ -12,6 +12,7 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.pushmonitor.PushMonitorDelegator; import com.linkedin.venice.system.store.MetaStoreWriter; +import com.linkedin.venice.utils.Utils; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -152,7 +153,7 @@ public static void deleteSystemStore( default: throw new VeniceException("Unknown system store type: " + systemStoreName); } - admin.truncateKafkaTopic(Version.composeRealTimeTopic(systemStoreName)); + admin.truncateKafkaTopic(Utils.composeRealTimeTopic(systemStoreName)); } else { LOGGER.info("The RT topic for: {} will not be deleted since the user store is migrating", systemStoreName); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index be1026cdffc..33a7a6ab79e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -863,7 +863,7 @@ public synchronized void initStorageCluster(String clusterName) { if (multiClusterConfigs.getControllerConfig(clusterName).isParticipantMessageStoreEnabled()) { participantMessageStoreRTTMap.put( clusterName, - Version.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); + Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(clusterName))); } waitUntilClusterResourceIsVisibleInEV(clusterName); } @@ -1087,7 +1087,7 @@ private void deleteStore( if (!store.isMigrating()) { // for RT topic block on deletion so that next create store does not see the lingering RT topic which could // have different partition count - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); truncateKafkaTopic(rtTopic.getName()); if (waitOnRTTopicDeletion && getTopicManager().containsTopic(rtTopic)) { throw new VeniceRetriableException("Waiting for RT topic deletion for store: " + storeName); @@ -1294,8 +1294,7 @@ public void sendPushJobDetails(PushJobStatusRecordKey key, PushJobDetails value) String pushJobDetailsStoreName = VeniceSystemStoreUtils.getPushJobDetailsStoreName(); if (pushJobDetailsRTTopic == null) { // Verify the RT topic exists and give some time in case it's getting created. - PubSubTopic expectedRTTopic = - pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(pushJobDetailsStoreName)); + PubSubTopic expectedRTTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(pushJobDetailsStoreName)); for (int attempt = 0; attempt < INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS; attempt++) { if (attempt > 0) Utils.sleep(INTERNAL_STORE_RTT_RETRY_BACKOFF_MS); @@ -2752,7 +2751,7 @@ private Pair addVersion( && store.getHybridStoreConfig().getDataReplicationPolicy() == DataReplicationPolicy.AGGREGATE) || store.isIncrementalPushEnabled())) { // Create rt topic in parent colo if the store is aggregate mode hybrid store - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); if (!getTopicManager().containsTopic(realTimeTopic)) { getTopicManager().createTopic( realTimeTopic, @@ -3127,21 +3126,6 @@ private Optional getVersionWithPushId(String clusterName, String storeN return Optional.empty(); } - /** - * Get the real time topic name for a given store. If the topic is not created in Kafka, it creates the - * real time topic and returns the topic name. - * @param clusterName name of the Venice cluster. - * @param storeName name of the store. - * @return name of the store's real time topic name. - */ - @Override - public String getRealTimeTopic(String clusterName, String storeName) { - checkControllerLeadershipFor(clusterName); - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); - ensureRealTimeTopicIsReady(clusterName, realTimeTopic); - return realTimeTopic.getName(); - } - /** * Get the real time topic name for a given store. If the topic is not created in Kafka, it creates the * real time topic and returns the topic name. @@ -3271,7 +3255,7 @@ public Version getIncrementalPushVersion(String clusterName, String storeName) { + version.getNumber() + " Store:" + storeName); } - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(rtTopic) || isTopicTruncated(rtTopic.getName())) { resources.getVeniceAdminStats().recordUnexpectedTopicAbsenceCount(); throw new VeniceException( @@ -3529,7 +3513,7 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver } } } - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); if (!store.isHybrid() && getTopicManager().containsTopic(rtTopic)) { safeDeleteRTTopic(clusterName, storeName); } @@ -3549,7 +3533,7 @@ private boolean hasFatalDataValidationError(PushMonitor pushMonitor, String topi private void safeDeleteRTTopic(String clusterName, String storeName) { boolean rtDeletionPermitted = isRTTopicDeletionPermittedByAllControllers(clusterName, storeName); if (rtDeletionPermitted) { - String rtTopicToDelete = Version.composeRealTimeTopic(storeName); + String rtTopicToDelete = Utils.composeRealTimeTopic(storeName); deleteRTTopicFromAllFabrics(rtTopicToDelete, clusterName); // Check if there is incremental push topic exist. If yes, delete it and send out to let other controller to // delete it. @@ -3565,7 +3549,7 @@ public boolean isRTTopicDeletionPermittedByAllControllers(String clusterName, St // to see if any version is still using RT before deleting the RT. // Since we perform this check everytime when a store version is deleted we can afford to do best effort // approach if some fabrics are unavailable or out of sync (temporarily). - String rtTopicName = Version.composeRealTimeTopic(storeName); + String rtTopicName = Utils.composeRealTimeTopic(storeName); Map controllerClientMap = getControllerClientMap(clusterName); for (Map.Entry controllerClientEntry: controllerClientMap.entrySet()) { StoreResponse storeResponse = controllerClientEntry.getValue().getStore(storeName); @@ -4279,7 +4263,7 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP } else { topicManager = getTopicManager(); } - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store.getName())); + PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); if (topicManager.containsTopic(realTimeTopic) && topicManager.getPartitionCount(realTimeTopic) == newPartitionCount) { LOGGER.info("Allow updating store " + store.getName() + " partition count to " + newPartitionCount); @@ -4759,7 +4743,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto if (originalStore.isHybrid()) { // If this is a hybrid store, always try to disable compaction if RT topic exists. try { - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(originalStore)); getTopicManager().updateTopicCompactionPolicy(rtTopic, false); } catch (PubSubTopicDoesNotExistException e) { LOGGER.error("Could not find realtime topic for hybrid store {}", storeName); @@ -4940,7 +4924,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto && !store.isSystemStore())); } store.setHybridStoreConfig(finalHybridConfig); - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); if (getTopicManager().containsTopicAndAllPartitionsAreOnline(rtTopic)) { // RT already exists, ensure the retention is correct getTopicManager() @@ -5143,7 +5127,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto e); // rollback to original store storeMetadataUpdate(clusterName, storeName, store -> originalStore); - PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(originalStore)); if (originalStore.isHybrid() && newHybridStoreConfig.isPresent() && getTopicManager().containsTopicAndAllPartitionsAreOnline(rtTopic)) { // Ensure the topic retention is rolled back too diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 4c1472e5196..74365eb9b50 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -1709,14 +1709,6 @@ private AddVersion getAddVersionMessage( return addVersion; } - /** - * @see VeniceHelixAdmin#getRealTimeTopic(String, String) - */ - @Override - public String getRealTimeTopic(String clusterName, String storeName) { - return getVeniceHelixAdmin().getRealTimeTopic(clusterName, storeName); - } - /** * @see VeniceHelixAdmin#getRealTimeTopic(String, Store) */ @@ -1753,7 +1745,7 @@ Version getIncrementalPushVersion(Version incrementalPushVersion, ExecutionStatu throw new VeniceException("Cannot start incremental push since batch push is on going." + " store: " + storeName); } - String incrementalPushTopic = Version.composeRealTimeTopic(storeName); + String incrementalPushTopic = Utils.composeRealTimeTopic(storeName); if (status.isError() || getVeniceHelixAdmin().isTopicTruncated(incrementalPushTopic)) { throw new VeniceException( "Cannot start incremental push since previous batch push has failed. Please run another bash job." diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java index a148d61ab3c..887710f6d8a 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java @@ -268,7 +268,7 @@ public Route requestTopicForPushing(Admin admin) { * Otherwise topic existence check fails internally. */ if (pushType.isIncremental() && isWriteComputeEnabled) { - admin.getRealTimeTopic(clusterName, storeName); + admin.getRealTimeTopic(clusterName, store); } final Optional certInRequest = @@ -311,7 +311,7 @@ public Route requestTopicForPushing(Admin admin) { admin.getSeparateRealTimeTopic(clusterName, storeName); responseTopic = Version.composeSeparateRealTimeTopic(storeName); } else { - responseTopic = Version.composeRealTimeTopic(storeName); + responseTopic = Utils.getRealTimeTopicName(store); } // disable amplificationFactor logic on real-time topic responseObject.setAmplificationFactor(1); @@ -396,7 +396,7 @@ public Route requestTopicForPushing(Admin admin) { } } - String realTimeTopic = admin.getRealTimeTopic(clusterName, storeName); + String realTimeTopic = admin.getRealTimeTopic(clusterName, store); responseObject.setKafkaTopic(realTimeTopic); // disable amplificationFactor logic on real-time topic responseObject.setAmplificationFactor(1); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java index 6bfa7f51b3a..9b45e1adec6 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java @@ -109,11 +109,15 @@ import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.http.HttpStatus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import spark.Request; import spark.Route; public class StoresRoutes extends AbstractRoute { + private static final Logger LOGGER = LogManager.getLogger(StoresRoutes.class); + private final PubSubTopicRepository pubSubTopicRepository; public StoresRoutes( @@ -857,7 +861,15 @@ public void internalHandle(Request request, MultiStoreTopicsResponse veniceRespo List deletableTopicsList = new ArrayList<>(); int minNumberOfUnusedKafkaTopicsToPreserve = admin.getMinNumberOfUnusedKafkaTopicsToPreserve(); allStoreTopics.forEach((storeName, topicsWithRetention) -> { - PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + String cluster; + try { + cluster = admin.discoverCluster(storeName).getFirst(); + } catch (VeniceNoStoreException e) { + LOGGER.warn("Store " + storeName + " does not exist. Skipping it."); + return; + } + Store store = admin.getStore(cluster, storeName); + PubSubTopic realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); if (topicsWithRetention.containsKey(realTimeTopic)) { if (admin.isTopicTruncatedBasedOnRetention(topicsWithRetention.get(realTimeTopic))) { deletableTopicsList.add(realTimeTopic.getName()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java index f20209b903f..ecb0605e990 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java @@ -20,6 +20,7 @@ import com.linkedin.venice.utils.StoreUtils; import com.linkedin.venice.utils.SystemTime; import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -244,7 +245,7 @@ public void transmitVersionSwapMessage(Store store, int previousVersion, int nex } // Write the thing! try (VeniceWriter veniceWriter = getVeniceWriterFactory().createVeniceWriter( - new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(store.getName())).setTime(getTimer()) + new VeniceWriterOptions.Builder(Utils.getRealTimeTopicName(store)).setTime(getTimer()) .setPartitionCount(previousStoreVersion.getPartitionCount()) .build())) { veniceWriter.broadcastVersionSwap( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index 9a5a78e61f0..891f852e14c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -32,6 +32,7 @@ import com.linkedin.venice.throttle.EventThrottler; import com.linkedin.venice.utils.HelixUtils; import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.utils.locks.ClusterLockManager; @@ -932,7 +933,7 @@ protected void checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlin try { String newStatusDetails; realTimeTopicSwitcher.switchToRealTimeTopic( - Version.composeRealTimeTopic(storeName), + Utils.getRealTimeTopicName(store), offlinePushStatus.getKafkaTopic(), store, aggregateRealTimeSourceKafkaUrl, diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java index 7b864229fbf..5ed9d3a2452 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java @@ -1,7 +1,13 @@ package com.linkedin.venice.controller; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import com.linkedin.venice.controller.stats.DisabledPartitionStats; diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java index dcc37bfdae5..5cfc2cef5a8 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java @@ -71,6 +71,7 @@ import com.linkedin.venice.utils.ExceptionUtils; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.RedundantExceptionFilter; +import com.linkedin.venice.utils.Utils; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -814,7 +815,7 @@ private void handleRequestTopic(ChannelHandlerContext ctx, VenicePathParserHelpe responseObject.setCluster(clusterName); responseObject.setName(storeName); responseObject.setPartitions(currentVersion.getPartitionCount()); - responseObject.setKafkaTopic(Version.composeRealTimeTopic(storeName)); + responseObject.setKafkaTopic(Utils.getRealTimeTopicName(store)); // RT topic only supports NO_OP compression responseObject.setCompressionStrategy(CompressionStrategy.NO_OP);