Skip to content

Commit

Permalink
Do not call getRealTimeTopic
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Dec 12, 2024
1 parent c709994 commit 6547aa6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3539,7 +3539,8 @@ public Version getReferenceVersionForStreamingWrites(String clusterName, String
* Create RT topic here in parent region. For child regions, RT should always be created as part of
* {@link RealTimeTopicSwitcher#ensurePreconditions(PubSubTopic, PubSubTopic, Store, Optional)}
*/
getRealTimeTopic(clusterName, storeName, partitionCount);
// getRealTimeTopic(clusterName, storeName, partitionCount);
System.out.println("getRealTimeTopic");
}
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(rtTopic, partitionCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,10 @@ void ensurePreconditions(
Version version =
store.getVersion(Version.parseVersionFromKafkaTopicName(topicWhereToSendTheTopicSwitch.getName()));
/**
* TopicReplicator is used in child fabrics to create real-time (RT) topic when a child fabric
* is ready to start buffer replay but RT topic doesn't exist. This scenario could happen for a
* hybrid store when users haven't started any Samza job yet. In this case, RT topic should be
* created with proper retention time instead of the default 5 days retention.
*
* Potential race condition: If both rewind-time update operation and buffer-replay
* start at the same time, RT topic might not be created with the expected retention time,
* which can be fixed by sending another rewind-time update command.
*
* TODO: RT topic should be created in both parent and child fabrics when the store is converted to
* hybrid (update store command handling). However, if a store is converted to hybrid when it
* doesn't have any existing version or a correct storage quota, we cannot decide the partition
* number for it.
* We create the real-time topics when creating hybrid version for the first time. This is to ensure that the
* real-time topics are created with the correct partition count. Here we'll only check retention time and update
* it if necessary.
* TODO: Remove topic creation logic from here once new code is deployed to all regions.
*/
createRealTimeTopicIfNeeded(store, version, srcTopicName, hybridStoreConfig.get());
if (version != null && version.isSeparateRealTimeTopicEnabled()) {
Expand Down Expand Up @@ -289,7 +280,7 @@ public void switchToRealTimeTopic(
} else {
hybridStoreConfig = Optional.ofNullable(store.getHybridStoreConfig());
}
// ensurePreconditions(realTimeTopic, topicWhereToSendTheTopicSwitch, store, hybridStoreConfig);
ensurePreconditions(realTimeTopic, topicWhereToSendTheTopicSwitch, store, hybridStoreConfig);
long rewindStartTimestamp = getRewindStartTime(version, hybridStoreConfig, version.getCreatedTime());
PubSubTopic finalTopicWhereToSendTheTopicSwitch = version.getPushType().isStreamReprocessing()
? pubSubTopicRepository.getTopic(Version.composeStreamReprocessingTopic(store.getName(), version.getNumber()))
Expand Down

0 comments on commit 6547aa6

Please sign in to comment.