diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequest.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequest.java index 5d14b42ca6..580ee2de74 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequest.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequest.java @@ -49,15 +49,6 @@ public RequestTopicForPushRequest(String clusterName, String storeName, PushType this.pushJobId = pushJobId; } - public static PushType extractPushType(String pushTypeString) { - try { - return PushType.valueOf(pushTypeString); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - pushTypeString + " is an invalid push type. Valid push types are: " + Arrays.toString(PushType.values())); - } - } - public String getClusterName() { return clusterName; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index 406422e8ed..1f9f08e42e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -10,10 +10,9 @@ import com.linkedin.venice.systemstore.schemas.StoreVersion; import com.linkedin.venice.views.VeniceView; import java.time.Duration; -import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; /** @@ -41,11 +40,21 @@ public interface Version extends Comparable, DataModelBackedStructure VALUE_TO_TYPE_MAP = new HashMap<>(4); + private static final Map NAME_TO_TYPE_MAP = new HashMap<>(4); + + // Static initializer for map population + static { + for (PushType type: PushType.values()) { + VALUE_TO_TYPE_MAP.put(type.value, type); + NAME_TO_TYPE_MAP.put(type.name(), type); + } + } PushType(int value) { this.value = value; @@ -68,15 +77,41 @@ public boolean isStreamReprocessing() { } public boolean isBatchOrStreamReprocessing() { - return isBatch() || isStreamReprocessing(); + return this == BATCH || this == STREAM_REPROCESSING; } + /** + * Retrieve the PushType based on its integer value. + * + * @param value the integer value of the PushType + * @return the corresponding PushType + * @throws VeniceException if the value is invalid + */ public static PushType valueOf(int value) { - Optional pushType = Arrays.stream(values()).filter(p -> p.value == value).findFirst(); - if (!pushType.isPresent()) { + PushType pushType = VALUE_TO_TYPE_MAP.get(value); + if (pushType == null) { throw new VeniceException("Invalid push type with int value: " + value); } - return pushType.get(); + return pushType; + } + + /** + * Extracts the PushType from its string name. + * + * @param pushTypeString the string representation of the PushType + * @return the corresponding PushType + * @throws IllegalArgumentException if the string is invalid + */ + public static PushType extractPushType(String pushTypeString) { + PushType pushType = NAME_TO_TYPE_MAP.get(pushTypeString); + if (pushType == null) { + throw new IllegalArgumentException( + String.format( + "%s is an invalid push type. Valid push types are: %s", + pushTypeString, + String.join(", ", NAME_TO_TYPE_MAP.keySet()))); + } + return pushType; } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequestTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequestTest.java index 2c8b2596ee..34e4be44c8 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequestTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/controllerapi/RequestTopicForPushRequestTest.java @@ -1,7 +1,6 @@ package com.linkedin.venice.controllerapi; import static com.linkedin.venice.meta.Version.PushType.BATCH; -import static com.linkedin.venice.meta.Version.PushType.STREAM; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -53,18 +52,6 @@ public void testRequestTopicForPushRequestConstructorArgs() { assertEquals(ex4.getMessage(), "pushJobId is required"); } - @Test - public void testExtractPushTypeValidAndInvalidValues() { - // Valid cases - assertEquals(RequestTopicForPushRequest.extractPushType("BATCH"), BATCH); - assertEquals(RequestTopicForPushRequest.extractPushType("STREAM"), STREAM); - - // Invalid case - IllegalArgumentException ex = Assert - .expectThrows(IllegalArgumentException.class, () -> RequestTopicForPushRequest.extractPushType("INVALID")); - assertTrue(ex.getMessage().contains("INVALID is an invalid push type")); - } - @Test public void testRequestTopicForPushRequestSettersAndGetters() { request.setSendStartOfPush(true); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestVersion.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestVersion.java index e261b956b8..f0f25ca010 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestVersion.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestVersion.java @@ -1,7 +1,13 @@ package com.linkedin.venice.meta; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.Version.PushType; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.Utils; import java.io.IOException; @@ -25,7 +31,7 @@ public class TestVersion { @Test public void identifiesValidTopicNames() { String goodTopic = "my_very_good_store_v4"; - Assert.assertTrue( + assertTrue( Version.isVersionTopicOrStreamReprocessingTopic(goodTopic), goodTopic + " should parse as a valid store-version topic"); @@ -41,7 +47,7 @@ public void serializes() throws IOException { int versionNumber = 17; Version version = new VersionImpl(storeName, versionNumber); String serialized = OBJECT_MAPPER.writeValueAsString(version); - Assert.assertTrue(serialized.contains(storeName)); + assertTrue(serialized.contains(storeName)); } /** @@ -52,21 +58,21 @@ public void serializes() throws IOException { @Test public void deserializeWithWrongFields() throws IOException { Version oldParsedVersion = OBJECT_MAPPER.readValue(OLD_SERIALIZED, Version.class); - Assert.assertEquals(oldParsedVersion.getStoreName(), "store-1492637190910-78714331"); + assertEquals(oldParsedVersion.getStoreName(), "store-1492637190910-78714331"); Version newParsedVersion = OBJECT_MAPPER.readValue(EXTRA_FIELD_SERIALIZED, Version.class); - Assert.assertEquals(newParsedVersion.getStoreName(), "store-1492637190910-12345678"); + assertEquals(newParsedVersion.getStoreName(), "store-1492637190910-12345678"); Version legacyParsedVersion = OBJECT_MAPPER.readValue(MISSING_FIELD_SERIALIZED, Version.class); - Assert.assertEquals(legacyParsedVersion.getStoreName(), "store-missing"); - Assert.assertNotNull(legacyParsedVersion.getPushJobId()); // missing final field can still deserialize, just gets - // arbitrary value from constructor + assertEquals(legacyParsedVersion.getStoreName(), "store-missing"); + assertNotNull(legacyParsedVersion.getPushJobId()); // missing final field can still deserialize, just gets + // arbitrary value from constructor } @Test public void testParseStoreFromRealTimeTopic() { String validRealTimeTopic = "abc_rt"; - Assert.assertEquals(Version.parseStoreFromRealTimeTopic(validRealTimeTopic), "abc"); + assertEquals(Version.parseStoreFromRealTimeTopic(validRealTimeTopic), "abc"); String invalidRealTimeTopic = "abc"; try { Version.parseStoreFromRealTimeTopic(invalidRealTimeTopic); @@ -80,19 +86,19 @@ public void testParseStoreFromRealTimeTopic() { public void testIsTopic() { String topic = "abc_rt"; Assert.assertFalse(Version.isVersionTopic(topic)); - Assert.assertTrue(Version.isRealTimeTopic(topic)); + assertTrue(Version.isRealTimeTopic(topic)); topic = "abc"; Assert.assertFalse(Version.isVersionTopic(topic)); topic = "abc_v12df"; Assert.assertFalse(Version.isVersionTopic(topic)); topic = "abc_v123"; - Assert.assertTrue(Version.isVersionTopic(topic)); + assertTrue(Version.isVersionTopic(topic)); Assert.assertFalse(Version.isRealTimeTopic(topic)); - Assert.assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic)); + assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic)); topic = "abc_v123_sr"; Assert.assertFalse(Version.isVersionTopic(topic)); - Assert.assertTrue(Version.isStreamReprocessingTopic(topic)); - Assert.assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic)); + assertTrue(Version.isStreamReprocessingTopic(topic)); + assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic)); topic = "abc_v12ab3_sr"; Assert.assertFalse(Version.isVersionTopic(topic)); Assert.assertFalse(Version.isStreamReprocessingTopic(topic)); @@ -108,41 +114,99 @@ public void testIsATopicThatIsVersioned() { String topic = "abc_rt"; Assert.assertFalse(Version.isATopicThatIsVersioned(topic)); topic = "abc_v1_sr"; - Assert.assertTrue(Version.isATopicThatIsVersioned(topic)); + assertTrue(Version.isATopicThatIsVersioned(topic)); topic = "abc_v1"; - Assert.assertTrue(Version.isATopicThatIsVersioned(topic)); + assertTrue(Version.isATopicThatIsVersioned(topic)); topic = "abc_v1_cc"; - Assert.assertTrue(Version.isATopicThatIsVersioned(topic)); + assertTrue(Version.isATopicThatIsVersioned(topic)); } @Test public void testParseStoreFromKafkaTopicName() { String storeName = "abc"; String topic = "abc_rt"; - Assert.assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName); + assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName); topic = "abc_v1"; - Assert.assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName); + assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName); topic = "abc_v1_cc"; - Assert.assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName); + assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName); } @Test public void testParseVersionFromKafkaTopicName() { int version = 1; String topic = "abc_v1"; - Assert.assertEquals(Version.parseVersionFromVersionTopicName(topic), version); + assertEquals(Version.parseVersionFromVersionTopicName(topic), version); topic = "abc_v1_cc"; - Assert.assertEquals(Version.parseVersionFromKafkaTopicName(topic), version); + assertEquals(Version.parseVersionFromKafkaTopicName(topic), version); } @Test void testVersionStatus() { for (VersionStatus status: VersionStatus.values()) { if (status == VersionStatus.KILLED) { - Assert.assertTrue(VersionStatus.isVersionKilled(status)); + assertTrue(VersionStatus.isVersionKilled(status)); } else { Assert.assertFalse(VersionStatus.isVersionKilled(status)); } } } + + @Test + public void testExtractPushType() { + // Case 1: Valid push types + assertEquals(PushType.extractPushType("BATCH"), PushType.BATCH); + assertEquals(PushType.extractPushType("STREAM_REPROCESSING"), PushType.STREAM_REPROCESSING); + assertEquals(PushType.extractPushType("STREAM"), PushType.STREAM); + assertEquals(PushType.extractPushType("INCREMENTAL"), PushType.INCREMENTAL); + + // Case 2: Invalid push type + String invalidType = "INVALID_TYPE"; + IllegalArgumentException invalidException = + expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(invalidType)); + assertTrue(invalidException.getMessage().contains(invalidType)); + assertTrue(invalidException.getMessage().contains("Valid push types are")); + + // Case 3: Case sensitivity + String lowerCaseType = "batch"; + IllegalArgumentException caseException = + expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(lowerCaseType)); + assertTrue(caseException.getMessage().contains(lowerCaseType)); + + // Case 4: Empty string + String emptyInput = ""; + IllegalArgumentException emptyException = + expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(emptyInput)); + assertTrue(emptyException.getMessage().contains(emptyInput)); + + // Case 5: Null input + IllegalArgumentException exception = + expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(null)); + assertNotNull(exception); + } + + @Test + public void testValueOfIntReturnsPushType() { + // Case 1: Valid integer values + assertEquals(PushType.valueOf(0), PushType.BATCH); + assertEquals(PushType.valueOf(1), PushType.STREAM_REPROCESSING); + assertEquals(PushType.valueOf(2), PushType.STREAM); + assertEquals(PushType.valueOf(3), PushType.INCREMENTAL); + + // Case 2: Invalid integer value (negative) + int invalidNegative = -1; + VeniceException negativeException = expectThrows(VeniceException.class, () -> PushType.valueOf(invalidNegative)); + assertTrue(negativeException.getMessage().contains("Invalid push type with int value: " + invalidNegative)); + + // Case 3: Invalid integer value (positive out of range) + int invalidPositive = 999; + VeniceException positiveException = expectThrows(VeniceException.class, () -> PushType.valueOf(invalidPositive)); + assertTrue(positiveException.getMessage().contains("Invalid push type with int value: " + invalidPositive)); + + // Case 4: Edge case - Valid minimum value + assertEquals(PushType.valueOf(0), PushType.BATCH); + + // Case 5: Edge case - Valid maximum value + assertEquals(PushType.valueOf(3), PushType.INCREMENTAL); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java index 9886dab753..86e807f212 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java @@ -474,7 +474,7 @@ public Route requestTopicForPushing(Admin admin) { RequestTopicForPushRequest requestTopicForPushRequest = new RequestTopicForPushRequest( request.queryParams(CLUSTER), request.queryParams(NAME), - RequestTopicForPushRequest.extractPushType(request.queryParams(PUSH_TYPE)), + PushType.extractPushType(request.queryParams(PUSH_TYPE)), request.queryParams(PUSH_JOB_ID)); // populate the request object with optional parameters