Skip to content

Commit

Permalink
Address review comments and more test
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Dec 18, 2024
1 parent 27c6cc4 commit 7f27513
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@ public RequestTopicForPushRequest(String clusterName, String storeName, PushType
this.pushJobId = pushJobId;
}

public static PushType extractPushType(String pushTypeString) {
try {
return PushType.valueOf(pushTypeString);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
pushTypeString + " is an invalid push type. Valid push types are: " + Arrays.toString(PushType.values()));
}
}

public String getClusterName() {
return clusterName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
import com.linkedin.venice.systemstore.schemas.StoreVersion;
import com.linkedin.venice.views.VeniceView;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;


/**
Expand Down Expand Up @@ -41,11 +40,21 @@ public interface Version extends Comparable<Version>, DataModelBackedStructure<S
*/
enum PushType {
BATCH(0), // Batch jobs will create a new version topic and write to it in a batch manner.
STREAM_REPROCESSING(1), // reprocessing jobs will create a new version topic and a reprocessing topic.
STREAM_REPROCESSING(1), // Reprocessing jobs will create a new version topic and a reprocessing topic.
STREAM(2), // Stream jobs will write to a buffer or RT topic.
INCREMENTAL(3); // Incremental jobs will re-use an existing version topic and write on top of it.

private final int value;
private static final Map<Integer, PushType> VALUE_TO_TYPE_MAP = new HashMap<>(4);
private static final Map<String, PushType> NAME_TO_TYPE_MAP = new HashMap<>(4);

// Static initializer for map population
static {
for (PushType type: PushType.values()) {
VALUE_TO_TYPE_MAP.put(type.value, type);
NAME_TO_TYPE_MAP.put(type.name(), type);
}
}

PushType(int value) {
this.value = value;
Expand All @@ -68,15 +77,41 @@ public boolean isStreamReprocessing() {
}

public boolean isBatchOrStreamReprocessing() {
return isBatch() || isStreamReprocessing();
return this == BATCH || this == STREAM_REPROCESSING;
}

/**
* Retrieve the PushType based on its integer value.
*
* @param value the integer value of the PushType
* @return the corresponding PushType
* @throws VeniceException if the value is invalid
*/
public static PushType valueOf(int value) {
Optional<PushType> pushType = Arrays.stream(values()).filter(p -> p.value == value).findFirst();
if (!pushType.isPresent()) {
PushType pushType = VALUE_TO_TYPE_MAP.get(value);
if (pushType == null) {
throw new VeniceException("Invalid push type with int value: " + value);
}
return pushType.get();
return pushType;
}

/**
* Extracts the PushType from its string name.
*
* @param pushTypeString the string representation of the PushType
* @return the corresponding PushType
* @throws IllegalArgumentException if the string is invalid
*/
public static PushType extractPushType(String pushTypeString) {
PushType pushType = NAME_TO_TYPE_MAP.get(pushTypeString);
if (pushType == null) {
throw new IllegalArgumentException(
String.format(
"%s is an invalid push type. Valid push types are: %s",
pushTypeString,
String.join(", ", NAME_TO_TYPE_MAP.keySet())));
}
return pushType;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.venice.controllerapi;

import static com.linkedin.venice.meta.Version.PushType.BATCH;
import static com.linkedin.venice.meta.Version.PushType.STREAM;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -53,18 +52,6 @@ public void testRequestTopicForPushRequestConstructorArgs() {
assertEquals(ex4.getMessage(), "pushJobId is required");
}

@Test
public void testExtractPushTypeValidAndInvalidValues() {
// Valid cases
assertEquals(RequestTopicForPushRequest.extractPushType("BATCH"), BATCH);
assertEquals(RequestTopicForPushRequest.extractPushType("STREAM"), STREAM);

// Invalid case
IllegalArgumentException ex = Assert
.expectThrows(IllegalArgumentException.class, () -> RequestTopicForPushRequest.extractPushType("INVALID"));
assertTrue(ex.getMessage().contains("INVALID is an invalid push type"));
}

@Test
public void testRequestTopicForPushRequestSettersAndGetters() {
request.setSendStartOfPush(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package com.linkedin.venice.meta;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version.PushType;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
import java.io.IOException;
Expand All @@ -25,7 +31,7 @@ public class TestVersion {
@Test
public void identifiesValidTopicNames() {
String goodTopic = "my_very_good_store_v4";
Assert.assertTrue(
assertTrue(
Version.isVersionTopicOrStreamReprocessingTopic(goodTopic),
goodTopic + " should parse as a valid store-version topic");

Expand All @@ -41,7 +47,7 @@ public void serializes() throws IOException {
int versionNumber = 17;
Version version = new VersionImpl(storeName, versionNumber);
String serialized = OBJECT_MAPPER.writeValueAsString(version);
Assert.assertTrue(serialized.contains(storeName));
assertTrue(serialized.contains(storeName));
}

/**
Expand All @@ -52,21 +58,21 @@ public void serializes() throws IOException {
@Test
public void deserializeWithWrongFields() throws IOException {
Version oldParsedVersion = OBJECT_MAPPER.readValue(OLD_SERIALIZED, Version.class);
Assert.assertEquals(oldParsedVersion.getStoreName(), "store-1492637190910-78714331");
assertEquals(oldParsedVersion.getStoreName(), "store-1492637190910-78714331");

Version newParsedVersion = OBJECT_MAPPER.readValue(EXTRA_FIELD_SERIALIZED, Version.class);
Assert.assertEquals(newParsedVersion.getStoreName(), "store-1492637190910-12345678");
assertEquals(newParsedVersion.getStoreName(), "store-1492637190910-12345678");

Version legacyParsedVersion = OBJECT_MAPPER.readValue(MISSING_FIELD_SERIALIZED, Version.class);
Assert.assertEquals(legacyParsedVersion.getStoreName(), "store-missing");
Assert.assertNotNull(legacyParsedVersion.getPushJobId()); // missing final field can still deserialize, just gets
// arbitrary value from constructor
assertEquals(legacyParsedVersion.getStoreName(), "store-missing");
assertNotNull(legacyParsedVersion.getPushJobId()); // missing final field can still deserialize, just gets
// arbitrary value from constructor
}

@Test
public void testParseStoreFromRealTimeTopic() {
String validRealTimeTopic = "abc_rt";
Assert.assertEquals(Version.parseStoreFromRealTimeTopic(validRealTimeTopic), "abc");
assertEquals(Version.parseStoreFromRealTimeTopic(validRealTimeTopic), "abc");
String invalidRealTimeTopic = "abc";
try {
Version.parseStoreFromRealTimeTopic(invalidRealTimeTopic);
Expand All @@ -80,19 +86,19 @@ public void testParseStoreFromRealTimeTopic() {
public void testIsTopic() {
String topic = "abc_rt";
Assert.assertFalse(Version.isVersionTopic(topic));
Assert.assertTrue(Version.isRealTimeTopic(topic));
assertTrue(Version.isRealTimeTopic(topic));
topic = "abc";
Assert.assertFalse(Version.isVersionTopic(topic));
topic = "abc_v12df";
Assert.assertFalse(Version.isVersionTopic(topic));
topic = "abc_v123";
Assert.assertTrue(Version.isVersionTopic(topic));
assertTrue(Version.isVersionTopic(topic));
Assert.assertFalse(Version.isRealTimeTopic(topic));
Assert.assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic));
assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic));
topic = "abc_v123_sr";
Assert.assertFalse(Version.isVersionTopic(topic));
Assert.assertTrue(Version.isStreamReprocessingTopic(topic));
Assert.assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic));
assertTrue(Version.isStreamReprocessingTopic(topic));
assertTrue(Version.isVersionTopicOrStreamReprocessingTopic(topic));
topic = "abc_v12ab3_sr";
Assert.assertFalse(Version.isVersionTopic(topic));
Assert.assertFalse(Version.isStreamReprocessingTopic(topic));
Expand All @@ -108,41 +114,99 @@ public void testIsATopicThatIsVersioned() {
String topic = "abc_rt";
Assert.assertFalse(Version.isATopicThatIsVersioned(topic));
topic = "abc_v1_sr";
Assert.assertTrue(Version.isATopicThatIsVersioned(topic));
assertTrue(Version.isATopicThatIsVersioned(topic));
topic = "abc_v1";
Assert.assertTrue(Version.isATopicThatIsVersioned(topic));
assertTrue(Version.isATopicThatIsVersioned(topic));
topic = "abc_v1_cc";
Assert.assertTrue(Version.isATopicThatIsVersioned(topic));
assertTrue(Version.isATopicThatIsVersioned(topic));
}

@Test
public void testParseStoreFromKafkaTopicName() {
String storeName = "abc";
String topic = "abc_rt";
Assert.assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName);
assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName);
topic = "abc_v1";
Assert.assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName);
assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName);
topic = "abc_v1_cc";
Assert.assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName);
assertEquals(Version.parseStoreFromKafkaTopicName(topic), storeName);
}

@Test
public void testParseVersionFromKafkaTopicName() {
int version = 1;
String topic = "abc_v1";
Assert.assertEquals(Version.parseVersionFromVersionTopicName(topic), version);
assertEquals(Version.parseVersionFromVersionTopicName(topic), version);
topic = "abc_v1_cc";
Assert.assertEquals(Version.parseVersionFromKafkaTopicName(topic), version);
assertEquals(Version.parseVersionFromKafkaTopicName(topic), version);
}

@Test
void testVersionStatus() {
for (VersionStatus status: VersionStatus.values()) {
if (status == VersionStatus.KILLED) {
Assert.assertTrue(VersionStatus.isVersionKilled(status));
assertTrue(VersionStatus.isVersionKilled(status));
} else {
Assert.assertFalse(VersionStatus.isVersionKilled(status));
}
}
}

@Test
public void testExtractPushType() {
// Case 1: Valid push types
assertEquals(PushType.extractPushType("BATCH"), PushType.BATCH);
assertEquals(PushType.extractPushType("STREAM_REPROCESSING"), PushType.STREAM_REPROCESSING);
assertEquals(PushType.extractPushType("STREAM"), PushType.STREAM);
assertEquals(PushType.extractPushType("INCREMENTAL"), PushType.INCREMENTAL);

// Case 2: Invalid push type
String invalidType = "INVALID_TYPE";
IllegalArgumentException invalidException =
expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(invalidType));
assertTrue(invalidException.getMessage().contains(invalidType));
assertTrue(invalidException.getMessage().contains("Valid push types are"));

// Case 3: Case sensitivity
String lowerCaseType = "batch";
IllegalArgumentException caseException =
expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(lowerCaseType));
assertTrue(caseException.getMessage().contains(lowerCaseType));

// Case 4: Empty string
String emptyInput = "";
IllegalArgumentException emptyException =
expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(emptyInput));
assertTrue(emptyException.getMessage().contains(emptyInput));

// Case 5: Null input
IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class, () -> PushType.extractPushType(null));
assertNotNull(exception);
}

@Test
public void testValueOfIntReturnsPushType() {
// Case 1: Valid integer values
assertEquals(PushType.valueOf(0), PushType.BATCH);
assertEquals(PushType.valueOf(1), PushType.STREAM_REPROCESSING);
assertEquals(PushType.valueOf(2), PushType.STREAM);
assertEquals(PushType.valueOf(3), PushType.INCREMENTAL);

// Case 2: Invalid integer value (negative)
int invalidNegative = -1;
VeniceException negativeException = expectThrows(VeniceException.class, () -> PushType.valueOf(invalidNegative));
assertTrue(negativeException.getMessage().contains("Invalid push type with int value: " + invalidNegative));

// Case 3: Invalid integer value (positive out of range)
int invalidPositive = 999;
VeniceException positiveException = expectThrows(VeniceException.class, () -> PushType.valueOf(invalidPositive));
assertTrue(positiveException.getMessage().contains("Invalid push type with int value: " + invalidPositive));

// Case 4: Edge case - Valid minimum value
assertEquals(PushType.valueOf(0), PushType.BATCH);

// Case 5: Edge case - Valid maximum value
assertEquals(PushType.valueOf(3), PushType.INCREMENTAL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public Route requestTopicForPushing(Admin admin) {
RequestTopicForPushRequest requestTopicForPushRequest = new RequestTopicForPushRequest(
request.queryParams(CLUSTER),
request.queryParams(NAME),
RequestTopicForPushRequest.extractPushType(request.queryParams(PUSH_TYPE)),
PushType.extractPushType(request.queryParams(PUSH_TYPE)),
request.queryParams(PUSH_JOB_ID));

// populate the request object with optional parameters
Expand Down

0 comments on commit 7f27513

Please sign in to comment.