Skip to content

Commit

Permalink
add a new config that will update real time topic during partition co…
Browse files Browse the repository at this point in the history
…unt update request
  • Loading branch information
arjun4084346 committed Dec 18, 2024
1 parent 26edffc commit e127ce7
Show file tree
Hide file tree
Showing 7 changed files with 784 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2356,4 +2356,6 @@ private ConfigKeys() {

public static final String SERVER_DELETE_UNASSIGNED_PARTITIONS_ON_STARTUP =
"server.delete.unassigned.partitions.on.startup";

public static final String UPDATE_REAL_TIME_TOPIC = "update.real.time.topic";
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,29 @@ private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, Stri
return Strings.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName;
}

public static String createNewRealTimeTopicName(String oldRealTimeTopicName) {
if (oldRealTimeTopicName == null || !oldRealTimeTopicName.endsWith(Version.REAL_TIME_TOPIC_SUFFIX)) {
throw new IllegalArgumentException("Invalid old name format");
}

// Extract the base name and current version
String base =
oldRealTimeTopicName.substring(0, oldRealTimeTopicName.length() - Version.REAL_TIME_TOPIC_SUFFIX.length());
String[] parts = base.split("_v");

String newBase;
if (parts.length == 2) {
// Increment the version
int version = Integer.parseInt(parts[1]) + 1;
newBase = parts[0] + "_v" + version;
} else {
// Start with version 2
newBase = base + "_v2";
}

return newBase + Version.REAL_TIME_TOPIC_SUFFIX;
}

private static class TimeUnitInfo {
String suffix;
int multiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,4 +393,47 @@ public void testGetLeaderTopicFromPubSubTopic() {
Utils.resolveLeaderTopicFromPubSubTopic(pubSubTopicRepository, separateRealTimeTopic),
realTimeTopic);
}

@Test
void testValidOldNameWithVersionIncrement() {
String oldName = "storeName_v1_rt";
String expectedNewName = "storeName_v2_rt";

String result = Utils.createNewRealTimeTopicName(oldName);

assertEquals(expectedNewName, result);
}

@Test
void testValidOldNameStartingNewVersion() {
String oldName = "storeName_rt";
String expectedNewName = "storeName_v2_rt";

String result = Utils.createNewRealTimeTopicName(oldName);

assertEquals(expectedNewName, result);
}

@Test
void testInvalidOldNameNull() {
assertThrows(IllegalArgumentException.class, () -> Utils.createNewRealTimeTopicName(null));
}

@Test
void testInvalidOldNameWithoutSuffix() {
String oldName = "storeName_v1";
assertThrows(IllegalArgumentException.class, () -> Utils.createNewRealTimeTopicName(oldName));
}

@Test
void testInvalidOldNameIncorrectFormat() {
String oldName = "storeName_v1_rt_extra";
assertThrows(IllegalArgumentException.class, () -> Utils.createNewRealTimeTopicName(oldName));
}

@Test
void testInvalidOldNameWithNonNumericVersion() {
String oldName = "storeName_vX_rt";
assertThrows(NumberFormatException.class, () -> Utils.createNewRealTimeTopicName(oldName));
}
}
Loading

0 comments on commit e127ce7

Please sign in to comment.