Skip to content

Commit

Permalink
add integration test to cover store changes
Browse files Browse the repository at this point in the history
  • Loading branch information
pthirun committed Dec 17, 2024
1 parent 09414c3 commit b158b5f
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
public class ReadOnlyStoreTest {
private static final Logger LOGGER = LogManager.getLogger(ReadOnlyStoreTest.class);

private static Random RANDOM;
private Random RANDOM;

@BeforeClass
public static void setup() {
public void setupReadOnlyStore() {
long seed = System.nanoTime();
RANDOM = new Random(seed);
LOGGER.info("Random seed set: {}", seed);
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testCloneStoreProperties() {
assertEquals(storeProperties.getNearlineProducerCountPerWriter(), store.getNearlineProducerCountPerWriter());
}

private static ZKStore populateZKStore(ZKStore store) {
private ZKStore populateZKStore(ZKStore store) {
store.setCurrentVersion(RANDOM.nextInt());
store.setPartitionCount(RANDOM.nextInt());
store.setLowWatermark(RANDOM.nextLong());
Expand Down Expand Up @@ -158,7 +158,7 @@ private static ZKStore populateZKStore(ZKStore store) {
return store;
}

private static void assertEqualHybridConfig(StoreHybridConfig actual, HybridStoreConfig expected) {
private void assertEqualHybridConfig(StoreHybridConfig actual, HybridStoreConfig expected) {
assertEquals(actual.getRewindTimeInSeconds(), expected.getRewindTimeInSeconds());
assertEquals(actual.getOffsetLagThresholdToGoOnline(), expected.getOffsetLagThresholdToGoOnline());
assertEquals(
Expand Down Expand Up @@ -188,13 +188,13 @@ private static void assertEqualViewConfig(
}
}

private static void assertEqualsETLStoreConfig(StoreETLConfig actual, ETLStoreConfig expected) {
private void assertEqualsETLStoreConfig(StoreETLConfig actual, ETLStoreConfig expected) {
assertEquals(actual.getEtledUserProxyAccount(), expected.getEtledUserProxyAccount());
assertEquals(actual.getFutureVersionETLEnabled(), expected.isFutureVersionETLEnabled());
assertEquals(actual.getRegularVersionETLEnabled(), expected.isFutureVersionETLEnabled());
}

private static void assertEqualsPartitionerConfig(StorePartitionerConfig actual, PartitionerConfig expected) {
private void assertEqualsPartitionerConfig(StorePartitionerConfig actual, PartitionerConfig expected) {

assertEquals(actual.getPartitionerClass(), expected.getPartitionerClass());
assertEquals(actual.getAmplificationFactor(), expected.getAmplificationFactor());
Expand All @@ -207,7 +207,7 @@ private static void assertEqualsPartitionerConfig(StorePartitionerConfig actual,
}
}

private static void assertEqualsSystemStores(
private void assertEqualsSystemStores(
Map<CharSequence, SystemStoreProperties> actual,
Map<String, SystemStoreAttributes> expected) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,82 +2,165 @@

import static org.testng.Assert.*;

import com.fasterxml.jackson.databind.ObjectWriter;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.fastclient.utils.AbstractClientEndToEndSetup;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.metadata.response.StorePropertiesResponseRecord;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.SslUtils;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestServerStorePropertiesEndpoint extends AbstractClientEndToEndSetup {
private static final Logger LOGGER = LogManager.getLogger(TestServerStorePropertiesEndpoint.class);

private Random RANDOM;

private Optional<SSLFactory> sslFactory;
private VeniceServerWrapper veniceServerWrapper;
private Admin veniceClusterAdmin;
private String serverUrl;

@BeforeClass
public void beforeClassServerStoreProperties() {
long seed = System.nanoTime();
RANDOM = new Random(seed);
LOGGER.info("Random seed set: {}", seed);
}

@BeforeMethod
public void beforeMethodServerStoreProperties() {
sslFactory = Optional.of(SslUtils.getVeniceLocalSslFactory());
veniceServerWrapper = veniceCluster.getVeniceServers().stream().findAny().get();
veniceClusterAdmin = veniceCluster.getLeaderVeniceController().getVeniceAdmin();
serverUrl = "https://" + veniceServerWrapper.getHost() + ":" + veniceServerWrapper.getPort();
}

@AfterMethod
public void afterMethodServerStoreProperties() {
veniceServerWrapper.close();
veniceCluster.close();
}

@Test(timeOut = TIME_OUT)
public void testRequestBasedStoreProperties() throws Exception {

// SSL
Optional<SSLFactory> sslFactory = Optional.of(SslUtils.getVeniceLocalSslFactory());
Store store = veniceClusterAdmin.getStore(veniceCluster.getClusterName(), storeName);

StorePropertiesResponseRecord record = getStorePropertiesResponseRecord(storeName);

assertStorePropertiesResponseRecord(record, store);
}

@Test(timeOut = TIME_OUT)
public void testRequestBasedStorePropertiesWithStoreChanges() throws Exception {

String clusterName = veniceCluster.getClusterName();

String owner = Long.toString(RANDOM.nextLong());
int largestUsedVersion = RANDOM.nextInt();

// Server
VeniceServerWrapper veniceServerWrapper = veniceCluster.getVeniceServers().stream().findAny().get();
String serverUrl = "https://" + veniceServerWrapper.getHost() + ":" + veniceServerWrapper.getPort();
veniceCluster.getLeaderVeniceController().getVeniceAdmin().setStoreOwner(clusterName, storeName, owner);
veniceCluster.getLeaderVeniceController()
.getVeniceAdmin()
.setStoreLargestUsedVersion(clusterName, storeName, largestUsedVersion);

StorePropertiesResponseRecord record = getStorePropertiesResponseRecord(storeName);

assertEquals(record.storeMetaValue.storeProperties.owner.toString(), owner);
assertEquals(record.storeMetaValue.storeProperties.largestUsedVersionNumber, largestUsedVersion);

Store store = veniceClusterAdmin.getStore(clusterName, storeName);
assertStorePropertiesResponseRecord(record, store);
}

private StorePropertiesResponseRecord getStorePropertiesResponseRecord(String _storeName) throws Exception {
StorePropertiesResponseRecord record;

// Request
ClientConfig clientConfig = ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(serverUrl);
ClientConfig clientConfig = ClientConfig.defaultGenericClientConfig(_storeName).setVeniceURL(serverUrl);
clientConfig.setSslFactory(sslFactory.get());
TransportClient transportClient = ClientFactory.getTransportClient(clientConfig);
String requestUrl = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + storeName;
String requestUrl = QueryAction.STORE_PROPERTIES.toString().toLowerCase() + "/" + _storeName;
TransportClientResponse response = transportClient.get(requestUrl).get();

// Deserialize
Schema writerSchema = StorePropertiesResponseRecord.SCHEMA$;
RecordDeserializer<StorePropertiesResponseRecord> recordDeserializer = FastSerializerDeserializerFactory
.getFastAvroSpecificDeserializer(writerSchema, StorePropertiesResponseRecord.class);
StorePropertiesResponseRecord record = recordDeserializer.deserialize(response.getBody());
record = recordDeserializer.deserialize(response.getBody());

ObjectWriter jsonWriter = ObjectMapperFactory.getInstance().writerWithDefaultPrettyPrinter();
Object printObject = ObjectMapperFactory.getInstance().readValue(record.toString(), Object.class);
System.out.println(jsonWriter.writeValueAsString(printObject));
return record;
}

// Assert
Store expectedStore =
veniceCluster.getLeaderVeniceController().getVeniceAdmin().getStore(veniceCluster.getClusterName(), storeName);
private void assertStorePropertiesResponseRecord(StorePropertiesResponseRecord record, Store store) {

// Assert
assertNotNull(record);
assertNotNull(record.storeMetaValue);
assertNotNull(record.storeMetaValue.storeProperties);
assertEquals(record.storeMetaValue.storeProperties.name.toString(), expectedStore.getName());
assertEquals(record.storeMetaValue.storeProperties.owner.toString(), expectedStore.getOwner());
assertEquals(record.storeMetaValue.storeProperties.createdTime, expectedStore.getCreatedTime());
assertEquals(record.storeMetaValue.storeProperties.currentVersion, expectedStore.getCurrentVersion());
assertEquals(record.storeMetaValue.storeProperties.storageQuotaInByte, expectedStore.getStorageQuotaInByte());
assertEquals(record.storeMetaValue.storeProperties.readQuotaInCU, expectedStore.getReadQuotaInCU());
assertEquals(
record.storeMetaValue.storeProperties.largestUsedVersionNumber,
expectedStore.getLargestUsedVersionNumber());

// Store Properties
assertEquals(record.storeMetaValue.storeProperties.name.toString(), store.getName());
assertEquals(record.storeMetaValue.storeProperties.owner.toString(), store.getOwner());
assertEquals(record.storeMetaValue.storeProperties.createdTime, store.getCreatedTime());
assertEquals(record.storeMetaValue.storeProperties.currentVersion, store.getCurrentVersion());
assertEquals(record.storeMetaValue.storeProperties.partitionCount, store.getPartitionCount());
assertEquals(record.storeMetaValue.storeProperties.lowWatermark, store.getLowWatermark());
assertEquals(record.storeMetaValue.storeProperties.enableWrites, store.isEnableWrites());
assertEquals(record.storeMetaValue.storeProperties.enableReads, store.isEnableReads());
assertEquals(record.storeMetaValue.storeProperties.storageQuotaInByte, store.getStorageQuotaInByte());
assertEquals(record.storeMetaValue.storeProperties.readQuotaInCU, store.getReadQuotaInCU());
assertEquals(record.storeMetaValue.storeProperties.batchGetLimit, store.getBatchGetLimit());
assertEquals(record.storeMetaValue.storeProperties.largestUsedVersionNumber, store.getLargestUsedVersionNumber());
assertEquals(
record.storeMetaValue.storeProperties.latestVersionPromoteToCurrentTimestamp,
expectedStore.getLatestVersionPromoteToCurrentTimestamp());
assertEquals(record.storeMetaValue.storeProperties.versions.size(), expectedStore.getVersions().size());
assertEquals(record.storeMetaValue.storeProperties.systemStores.size(), expectedStore.getSystemStores().size());
store.getLatestVersionPromoteToCurrentTimestamp());
assertEquals(record.storeMetaValue.storeProperties.versions.size(), store.getVersions().size());
assertEquals(record.storeMetaValue.storeProperties.systemStores.size(), store.getSystemStores().size());

// Store Key Schemas
assertNotNull(record.storeMetaValue.storeKeySchemas);
assertNotNull(record.storeMetaValue.storeKeySchemas.keySchemaMap);
for (Map.Entry<CharSequence, CharSequence> entry: record.storeMetaValue.storeKeySchemas.keySchemaMap.entrySet()) {
SchemaEntry expectedKeySchemaEntry = veniceClusterAdmin.getKeySchema(veniceCluster.getClusterName(), storeName);

String actual = entry.getValue().toString();
String expected = expectedKeySchemaEntry.getSchema().toString();
assertEquals(actual, expected);
}

// Store Value Schemas
assertNotNull(record.storeMetaValue.storeValueSchemas);
for (Map.Entry<CharSequence, CharSequence> entry: record.storeMetaValue.storeValueSchemas.valueSchemaMap
.entrySet()) {
int valueSchemaId = Integer.parseInt(entry.getKey().toString());
SchemaEntry expectedValueSchemaEntry =
veniceClusterAdmin.getValueSchema(veniceCluster.getClusterName(), storeName, valueSchemaId);

String actual = entry.getValue().toString();
String expected = expectedValueSchemaEntry.getSchema().toString();
assertEquals(actual, expected);
}
assertNotNull(record.helixGroupInfo);
assertNotNull(record.routingInfo);

// Close
veniceServerWrapper.close();
veniceCluster.close();
}
}

0 comments on commit b158b5f

Please sign in to comment.