Skip to content

Commit

Permalink
Reduce the number of controller requests sent by system store initial…
Browse files Browse the repository at this point in the history
…izer (#789)

Today controller client backed initializer calls get_value_or_derived_schema_id
API for every meta system store value schema, to check if its partial update
schema exists. But actually controller has get_all_value_and_derived_schema API
to get all schemas so that we can check if any schema exists. This commit
switches to use this API, which helps reduce the number of controller requests
from n to 1.
  • Loading branch information
shuhui-liu authored Jan 3, 2024
1 parent 57013c5 commit 16ed564
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -134,26 +137,33 @@ public void execute(Map<Integer, Schema> inputSchemas) {
}

MultiSchemaResponse multiSchemaResponse =
controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, c -> c.getAllValueSchema(storeName));
controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, c -> c.getAllValueAndDerivedSchema(storeName));
if (multiSchemaResponse.isError()) {
throw new VeniceException(
"Error when getting all value schemas from system store " + storeName + " in cluster " + clusterName
+ " after retries. Error: " + multiSchemaResponse.getError());
}
Map<Integer, Schema> schemasInZk = new HashMap<>();
Arrays.stream(multiSchemaResponse.getSchemas())
.forEach(schema -> schemasInZk.put(schema.getId(), AvroCompatibilityHelper.parse(schema.getSchemaStr())));
Map<Integer, Schema> valueSchemasInZk = new HashMap<>();
List<DerivedSchemaEntry> partialUpdateSchemasInZk = new ArrayList<>();
Arrays.stream(multiSchemaResponse.getSchemas()).forEach(schema -> {
if (schema.isDerivedSchema()) {
partialUpdateSchemasInZk
.add(new DerivedSchemaEntry(schema.getId(), schema.getDerivedSchemaId(), schema.getSchemaStr()));
} else {
valueSchemasInZk.put(schema.getId(), AvroCompatibilityHelper.parse(schema.getSchemaStr()));
}
});

if (isSchemaResourceInLocal) {
registerLocalSchemaResources(storeName, schemaResources, schemasInZk);
registerLocalSchemaResources(storeName, schemaResources, valueSchemasInZk, partialUpdateSchemasInZk);
} else {
// For passed in new schemas, its version could be larger than protocolDefinition.getCurrentProtocolVersion(),
// register schema directly.
for (Map.Entry<Integer, Schema> entry: schemaResources.entrySet()) {
checkAndMayRegisterValueSchema(
storeName,
entry.getKey(),
schemasInZk.get(entry.getKey()),
valueSchemasInZk.get(entry.getKey()),
entry.getValue(),
determineSchemaCompatabilityType());
}
Expand All @@ -163,7 +173,8 @@ public void execute(Map<Integer, Schema> inputSchemas) {
private void registerLocalSchemaResources(
String storeName,
Map<Integer, Schema> schemaResources,
Map<Integer, Schema> schemasInZk) {
Map<Integer, Schema> valueSchemasInZk,
List<DerivedSchemaEntry> partialUpdateSchemasInZk) {
for (int version = 1; version <= protocolDefinition.getCurrentProtocolVersion(); version++) {
Schema schemaInLocalResources = schemaResources.get(version);
if (schemaInLocalResources == null) {
Expand All @@ -175,22 +186,18 @@ private void registerLocalSchemaResources(
checkAndMayRegisterValueSchema(
storeName,
version,
schemasInZk.get(version),
valueSchemasInZk.get(version),
schemaInLocalResources,
determineSchemaCompatabilityType());

if (autoRegisterPartialUpdateSchema) {
checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources);
checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources, partialUpdateSchemasInZk);
}
}
}
}

DirectionalSchemaCompatibilityType determineSchemaCompatabilityType() {
if (protocolDefinition == AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE) {
return DirectionalSchemaCompatibilityType.FULL;
}

if (protocolDefinition == AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE) {
return DirectionalSchemaCompatibilityType.BACKWARD;
}
Expand Down Expand Up @@ -317,35 +324,35 @@ private void checkAndMayRegisterValueSchema(
private void checkAndMayRegisterPartialUpdateSchema(
String storeName,
int valueSchemaId,
Schema schemaInLocalResources) {
String partialUpdateSchema =
WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(schemaInLocalResources).toString();
SchemaResponse getSchemaResponse = controllerClient.retryableRequest(
DEFAULT_RETRY_TIMES,
c -> c.getValueOrDerivedSchemaId(storeName, partialUpdateSchema),
r -> r.getError().contains("Can not find any registered value schema nor derived schema"));
if (getSchemaResponse.isError()) {
if (getSchemaResponse.getError().contains("Can not find any registered value schema nor derived schema")) {
// The derived schema doesn't exist right now, try to register it.
SchemaResponse addDerivedSchemaResponse = controllerClient.retryableRequest(
DEFAULT_RETRY_TIMES,
c -> c.addDerivedSchema(storeName, valueSchemaId, partialUpdateSchema));
if (addDerivedSchemaResponse.isError()) {
throw new VeniceException(
"Error when adding derived schema for value schema v" + valueSchemaId + " to system store " + storeName
+ " in cluster " + clusterName + " after retries. Error: " + addDerivedSchemaResponse.getError());
}
Schema valueSchemaInLocalResources,
List<DerivedSchemaEntry> partialUpdateSchemasInZk) {
Schema partialUpdateSchema =
WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchemaInLocalResources);
String partialUpdateSchemaToFind = AvroCompatibilityHelper.toParsingForm(partialUpdateSchema);
for (DerivedSchemaEntry partialUpdateSchemaInZk: partialUpdateSchemasInZk) {
if (partialUpdateSchemaToFind.equals(partialUpdateSchemaInZk.getCanonicalSchemaStr())) {
LOGGER.info(
"Added derived schema v{} for value schema v{} to system store {}.",
addDerivedSchemaResponse.getDerivedSchemaId(),
valueSchemaId,
storeName);
} else {
throw new VeniceException(
"Error when getting derived schema from system store " + storeName + " in cluster " + clusterName
+ " after retries. Error: " + getSchemaResponse.getError());
"Partial update schema in system store {} is already registered as version {}-{}.",
storeName,
partialUpdateSchemaInZk.getValueSchemaID(),
partialUpdateSchemaInZk.getId());
return;
}
}
// Partial update schema doesn't exist right now, try to register it.
SchemaResponse addDerivedSchemaResponse = controllerClient.retryableRequest(
DEFAULT_RETRY_TIMES,
c -> c.addDerivedSchema(storeName, valueSchemaId, partialUpdateSchema.toString()));
if (addDerivedSchemaResponse.isError()) {
throw new VeniceException(
"Error when adding partial update schema for value schema v" + valueSchemaId + " to system store " + storeName
+ " in cluster " + clusterName + " after retries. Error: " + addDerivedSchemaResponse.getError());
}
LOGGER.info(
"Added partial update schema v{}-{} to system store {}.",
valueSchemaId,
addDerivedSchemaResponse.getDerivedSchemaId(),
storeName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@
import com.linkedin.venice.exceptions.ErrorType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.Utils;
import java.io.IOException;
import java.util.Optional;
import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.Test;


public class ControllerClientBackedSystemSchemaInitializerTest {
@Test
public void testCreateSystemStoreAndRegisterSchema() {
public void testCreateSystemStoreAndRegisterSchema() throws IOException {

try (ControllerClientBackedSystemSchemaInitializer initializer = new ControllerClientBackedSystemSchemaInitializer(
AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE,
"testCluster",
null,
null,
false,
true,
Optional.empty(),
"",
"",
Expand All @@ -50,7 +54,7 @@ public void testCreateSystemStoreAndRegisterSchema() {
"testCluster",
null,
null,
false,
true,
Optional.empty(),
"",
"d2Service",
Expand All @@ -68,17 +72,33 @@ public void testCreateSystemStoreAndRegisterSchema() {
NewStoreResponse newStoreResponse = mock(NewStoreResponse.class);
doReturn(newStoreResponse).when(controllerClient).createNewSystemStore(any(), any(), any(), any());
MultiSchemaResponse multiSchemaResponse = mock(MultiSchemaResponse.class);
doReturn(new MultiSchemaResponse.Schema[0]).when(multiSchemaResponse).getSchemas();
doReturn(multiSchemaResponse).when(controllerClient).getAllValueSchema(any());
SchemaResponse schemaResponse = mock(SchemaResponse.class);
doReturn(schemaResponse).when(controllerClient).addValueSchema(any(), any(), anyInt(), any());
MultiSchemaResponse.Schema[] schemas = new MultiSchemaResponse.Schema[2];
Schema valueSchema = Utils.getSchemaFromResource("avro/StoreMetaValue/v1/StoreMetaValue.avsc");
schemas[0] = new MultiSchemaResponse.Schema();
schemas[0].setId(1);
schemas[0].setSchemaStr(valueSchema.toString());
schemas[1] = new MultiSchemaResponse.Schema();
schemas[1].setId(1);
schemas[1].setDerivedSchemaId(1);
schemas[1]
.setSchemaStr(WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema).toString());
doReturn(schemas).when(multiSchemaResponse).getSchemas();
doReturn(multiSchemaResponse).when(controllerClient).getAllValueAndDerivedSchema(any());
doReturn(mock(SchemaResponse.class)).when(controllerClient).addValueSchema(any(), any(), anyInt(), any());
doReturn(mock(SchemaResponse.class)).when(controllerClient).addDerivedSchema(any(), anyInt(), any());
doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any(), any());
doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any());
initializer.setControllerClient(controllerClient);
initializer.execute();
verify(controllerClient, times(1)).createNewSystemStore(any(), any(), any(), any());
verify(controllerClient, times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion()))
.addValueSchema(any(), any(), anyInt(), any());
verify(
controllerClient,
times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion() - 1))
.addValueSchema(any(), any(), anyInt(), any());
verify(
controllerClient,
times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion() - 1))
.addDerivedSchema(any(), anyInt(), any());
}
verify(controllerClient, times(1)).close();
}
Expand Down

0 comments on commit 16ed564

Please sign in to comment.