Skip to content

Commit

Permalink
[da-vinci] Improve NativeMetadataRepository schema retrieval efficien…
Browse files Browse the repository at this point in the history
…cy (#751)

Currently metadata refresh in DVC would generate many unnecessary schema requests to the
meta system store. The severity increases with the number of DVC instances and the number
of value schema versions for the subscribed stores. Since value schemas are immutable in
Venice we will now:

1. Only retrieve value schemas that we don't have and reuse the SchemaData object that we
cache in the NativeMetadataRepository.
  • Loading branch information
xunyin8 authored Nov 15, 2023
1 parent a727950 commit 3cf409e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,23 +430,33 @@ protected StoreConfig getStoreConfigFromMetaSystemStore(String storeName) {

// Helper function with common code for retrieving SchemaData from meta system store.
protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) {
StoreMetaKey keySchemaKey =
MetaStoreDataType.STORE_KEY_SCHEMAS.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
SchemaData schemaData = schemaMap.get(storeName);
SchemaEntry keySchema;
if (schemaData == null) {
// Retrieve the key schema and initialize SchemaData only if it's not cached yet.
StoreMetaKey keySchemaKey = MetaStoreDataType.STORE_KEY_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> keySchemaMap =
getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap;
if (keySchemaMap.isEmpty()) {
throw new VeniceException("No key schema found for store: " + storeName);
}
Map.Entry<CharSequence, CharSequence> keySchemaEntry = keySchemaMap.entrySet().iterator().next();
keySchema =
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString());
schemaData = new SchemaData(storeName, keySchema);
}
StoreMetaKey valueSchemaKey = MetaStoreDataType.STORE_VALUE_SCHEMAS
.getStoreMetaKey(Collections.singletonMap(KEY_STRING_STORE_NAME, storeName));
Map<CharSequence, CharSequence> keySchemaMap =
getStoreMetaValue(storeName, keySchemaKey).storeKeySchemas.keySchemaMap;
if (keySchemaMap.isEmpty()) {
throw new VeniceException("No key schema found for store: " + storeName);
}
Map.Entry<CharSequence, CharSequence> keySchemaEntry = keySchemaMap.entrySet().iterator().next();
SchemaEntry keySchema =
new SchemaEntry(Integer.parseInt(keySchemaEntry.getKey().toString()), keySchemaEntry.getValue().toString());
SchemaData schemaData = new SchemaData(storeName, keySchema);
Map<CharSequence, CharSequence> valueSchemaMap =
getStoreMetaValue(storeName, valueSchemaKey).storeValueSchemas.valueSchemaMap;
// Check the value schema string, if it's empty then try to query the other key space for individual value schema.
for (Map.Entry<CharSequence, CharSequence> entry: valueSchemaMap.entrySet()) {
// Check if we already have the corresponding value schema
int valueSchemaId = Integer.parseInt(entry.getKey().toString());
if (schemaData.getValueSchema(valueSchemaId) != null) {
continue;
}
if (entry.getValue().toString().isEmpty()) {
// The value schemas might be too large to be stored in a single K/V.
StoreMetaKey individualValueSchemaKey =
Expand All @@ -460,10 +470,9 @@ protected SchemaData getSchemaDataFromMetaSystemStore(String storeName) {
// the individual value schema key space.
String valueSchema =
getStoreMetaValue(storeName, individualValueSchemaKey).storeValueSchema.valueSchema.toString();
schemaData.addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), valueSchema));
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, valueSchema));
} else {
schemaData
.addValueSchema(new SchemaEntry(Integer.parseInt(entry.getKey().toString()), entry.getValue().toString()));
schemaData.addValueSchema(new SchemaEntry(valueSchemaId, entry.getValue().toString()));
}
}
return schemaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
import static org.mockito.Mockito.when;

import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.schemas.TestKeyRecord;
import com.linkedin.venice.client.store.schemas.TestValueRecord;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.systemstore.schemas.StoreKeySchemas;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.systemstore.schemas.StoreValueSchema;
import com.linkedin.venice.systemstore.schemas.StoreValueSchemas;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.HashMap;
import java.util.Map;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -56,7 +63,45 @@ public void testGetSchemaDataFromReadThroughCache() throws InterruptedException
Assert.assertNotNull(nmr.getKeySchema(STORE_NAME));
}

@Test
public void testGetSchemaDataEfficiently() throws InterruptedException {
doReturn(Long.MAX_VALUE).when(backendConfig)
.getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong());
TestNMR nmr = new TestNMR(clientConfig, backendConfig);
nmr.start();
Assert.assertEquals(nmr.keySchemaRequestCount, 0);
Assert.assertEquals(nmr.valueSchemasRequestCount, 0);
Assert.assertEquals(nmr.specificValueSchemaRequestCount, 0);
nmr.subscribe(STORE_NAME);
Assert.assertEquals(nmr.keySchemaRequestCount, 1);
Assert.assertEquals(nmr.valueSchemasRequestCount, 1);
Assert.assertEquals(nmr.specificValueSchemaRequestCount, 1);
Assert.assertNotNull(nmr.getKeySchema(STORE_NAME));
Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 1));
// Refresh the store, we are not expecting to retrieve key schema or any specific value schema again.
nmr.refreshOneStore(STORE_NAME);
Assert.assertEquals(nmr.keySchemaRequestCount, 1);
Assert.assertEquals(nmr.valueSchemasRequestCount, 2);
Assert.assertEquals(nmr.specificValueSchemaRequestCount, 1);
Assert.assertNotNull(nmr.getKeySchema(STORE_NAME));
Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 1));
// Refresh the store a few more times to retrieve value schema v2
for (int i = 0; i < 10; i++) {
nmr.refreshOneStore(STORE_NAME);
}
Assert.assertEquals(nmr.keySchemaRequestCount, 1);
Assert.assertEquals(nmr.valueSchemasRequestCount, 12);
Assert.assertEquals(nmr.specificValueSchemaRequestCount, 2);
Assert.assertNotNull(nmr.getKeySchema(STORE_NAME));
Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 1));
Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 2));
}

static class TestNMR extends NativeMetadataRepository {
int keySchemaRequestCount = 0;
int valueSchemasRequestCount = 0;
int specificValueSchemaRequestCount = 0;

protected TestNMR(ClientConfig clientConfig, VeniceProperties backendConfig) {
super(clientConfig, backendConfig);
}
Expand All @@ -78,14 +123,37 @@ protected Store getStoreFromSystemStore(String storeName, String clusterName) {

@Override
protected StoreMetaValue getStoreMetaValue(String storeName, StoreMetaKey key) {
return null;
StoreMetaValue storeMetaValue = new StoreMetaValue();
MetaStoreDataType metaStoreDataType = MetaStoreDataType.valueOf(key.metadataType);
switch (metaStoreDataType) {
case STORE_KEY_SCHEMAS:
Map<CharSequence, CharSequence> keySchemaMap = new HashMap<>();
keySchemaMap.put(String.valueOf(1), TestKeyRecord.SCHEMA$.toString());
storeMetaValue.storeKeySchemas = new StoreKeySchemas(keySchemaMap);
keySchemaRequestCount++;
break;
case STORE_VALUE_SCHEMAS:
Map<CharSequence, CharSequence> valueSchemaMap = new HashMap<>();
valueSchemaMap.put(String.valueOf(1), "");
if (valueSchemasRequestCount > 1) {
valueSchemaMap.put(String.valueOf(2), "");
}
storeMetaValue.storeValueSchemas = new StoreValueSchemas(valueSchemaMap);
valueSchemasRequestCount++;
break;
case STORE_VALUE_SCHEMA:
storeMetaValue.storeValueSchema = new StoreValueSchema(TestValueRecord.SCHEMA$.toString());
specificValueSchemaRequestCount++;
break;
default:
// do nothing
}
return storeMetaValue;
}

@Override
protected SchemaData getSchemaDataFromSystemStore(String storeName) {
SchemaData schemaData = mock(SchemaData.class);
when(schemaData.getKeySchema()).thenReturn(mock(SchemaEntry.class));
return schemaData;
return getSchemaDataFromMetaSystemStore(storeName);
}
}
}

0 comments on commit 3cf409e

Please sign in to comment.