diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 4ba209c348..6d8d0e9c5c 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -3539,7 +3539,8 @@ public Version getReferenceVersionForStreamingWrites(String clusterName, String * Create RT topic here in parent region. For child regions, RT should always be created as part of * {@link RealTimeTopicSwitcher#ensurePreconditions(PubSubTopic, PubSubTopic, Store, Optional)} */ - getRealTimeTopic(clusterName, storeName, partitionCount); + // getRealTimeTopic(clusterName, storeName, partitionCount); + System.out.println("getRealTimeTopic"); } PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(rtTopic, partitionCount) diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java index 58cf6c2ab1..af876f3884 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.java @@ -137,19 +137,10 @@ void ensurePreconditions( Version version = store.getVersion(Version.parseVersionFromKafkaTopicName(topicWhereToSendTheTopicSwitch.getName())); /** - * TopicReplicator is used in child fabrics to create real-time (RT) topic when a child fabric - * is ready to start buffer replay but RT topic doesn't exist. This scenario could happen for a - * hybrid store when users haven't started any Samza job yet. In this case, RT topic should be - * created with proper retention time instead of the default 5 days retention. - * - * Potential race condition: If both rewind-time update operation and buffer-replay - * start at the same time, RT topic might not be created with the expected retention time, - * which can be fixed by sending another rewind-time update command. - * - * TODO: RT topic should be created in both parent and child fabrics when the store is converted to - * hybrid (update store command handling). However, if a store is converted to hybrid when it - * doesn't have any existing version or a correct storage quota, we cannot decide the partition - * number for it. + * We create the real-time topics when creating hybrid version for the first time. This is to ensure that the + * real-time topics are created with the correct partition count. Here we'll only check retention time and update + * it if necessary. + * TODO: Remove topic creation logic from here once new code is deployed to all regions. */ createRealTimeTopicIfNeeded(store, version, srcTopicName, hybridStoreConfig.get()); if (version != null && version.isSeparateRealTimeTopicEnabled()) { @@ -289,7 +280,7 @@ public void switchToRealTimeTopic( } else { hybridStoreConfig = Optional.ofNullable(store.getHybridStoreConfig()); } - // ensurePreconditions(realTimeTopic, topicWhereToSendTheTopicSwitch, store, hybridStoreConfig); + ensurePreconditions(realTimeTopic, topicWhereToSendTheTopicSwitch, store, hybridStoreConfig); long rewindStartTimestamp = getRewindStartTime(version, hybridStoreConfig, version.getCreatedTime()); PubSubTopic finalTopicWhereToSendTheTopicSwitch = version.getPushType().isStreamReprocessing() ? pubSubTopicRepository.getTopic(Version.composeStreamReprocessingTopic(store.getName(), version.getNumber()))