From 16ed5640360bdc5237b110022c9a8f47b1d03de9 Mon Sep 17 00:00:00 2001 From: Shuhui Liu Date: Tue, 2 Jan 2024 16:22:52 -0800 Subject: [PATCH] Reduce the number of controller requests sent by system store initializer (#789) Today controller client backed initializer calls get_value_or_derived_schema_id API for every meta system store value schema, to check if its partial update schema exists. But actually controller has get_all_value_and_derived_schema API to get all schemas so that we can check if any schema exists. This commit switches to use this API, which helps reduce the number of controller requests from n to 1. --- ...erClientBackedSystemSchemaInitializer.java | 85 ++++++++++--------- ...ientBackedSystemSchemaInitializerTest.java | 38 +++++++-- 2 files changed, 75 insertions(+), 48 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java index cf47fde9ed..c63cb74035 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java @@ -16,6 +16,7 @@ import com.linkedin.venice.exceptions.ErrorType; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.security.SSLFactory; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; @@ -23,9 +24,11 @@ import com.linkedin.venice.utils.Utils; import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.avro.Schema; @@ -134,18 +137,25 @@ public void execute(Map inputSchemas) { } MultiSchemaResponse multiSchemaResponse = - controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, c -> c.getAllValueSchema(storeName)); + controllerClient.retryableRequest(DEFAULT_RETRY_TIMES, c -> c.getAllValueAndDerivedSchema(storeName)); if (multiSchemaResponse.isError()) { throw new VeniceException( "Error when getting all value schemas from system store " + storeName + " in cluster " + clusterName + " after retries. Error: " + multiSchemaResponse.getError()); } - Map schemasInZk = new HashMap<>(); - Arrays.stream(multiSchemaResponse.getSchemas()) - .forEach(schema -> schemasInZk.put(schema.getId(), AvroCompatibilityHelper.parse(schema.getSchemaStr()))); + Map valueSchemasInZk = new HashMap<>(); + List partialUpdateSchemasInZk = new ArrayList<>(); + Arrays.stream(multiSchemaResponse.getSchemas()).forEach(schema -> { + if (schema.isDerivedSchema()) { + partialUpdateSchemasInZk + .add(new DerivedSchemaEntry(schema.getId(), schema.getDerivedSchemaId(), schema.getSchemaStr())); + } else { + valueSchemasInZk.put(schema.getId(), AvroCompatibilityHelper.parse(schema.getSchemaStr())); + } + }); if (isSchemaResourceInLocal) { - registerLocalSchemaResources(storeName, schemaResources, schemasInZk); + registerLocalSchemaResources(storeName, schemaResources, valueSchemasInZk, partialUpdateSchemasInZk); } else { // For passed in new schemas, its version could be larger than protocolDefinition.getCurrentProtocolVersion(), // register schema directly. @@ -153,7 +163,7 @@ public void execute(Map inputSchemas) { checkAndMayRegisterValueSchema( storeName, entry.getKey(), - schemasInZk.get(entry.getKey()), + valueSchemasInZk.get(entry.getKey()), entry.getValue(), determineSchemaCompatabilityType()); } @@ -163,7 +173,8 @@ public void execute(Map inputSchemas) { private void registerLocalSchemaResources( String storeName, Map schemaResources, - Map schemasInZk) { + Map valueSchemasInZk, + List partialUpdateSchemasInZk) { for (int version = 1; version <= protocolDefinition.getCurrentProtocolVersion(); version++) { Schema schemaInLocalResources = schemaResources.get(version); if (schemaInLocalResources == null) { @@ -175,22 +186,18 @@ private void registerLocalSchemaResources( checkAndMayRegisterValueSchema( storeName, version, - schemasInZk.get(version), + valueSchemasInZk.get(version), schemaInLocalResources, determineSchemaCompatabilityType()); if (autoRegisterPartialUpdateSchema) { - checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources); + checkAndMayRegisterPartialUpdateSchema(storeName, version, schemaInLocalResources, partialUpdateSchemasInZk); } } } } DirectionalSchemaCompatibilityType determineSchemaCompatabilityType() { - if (protocolDefinition == AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE) { - return DirectionalSchemaCompatibilityType.FULL; - } - if (protocolDefinition == AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE) { return DirectionalSchemaCompatibilityType.BACKWARD; } @@ -317,35 +324,35 @@ private void checkAndMayRegisterValueSchema( private void checkAndMayRegisterPartialUpdateSchema( String storeName, int valueSchemaId, - Schema schemaInLocalResources) { - String partialUpdateSchema = - WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(schemaInLocalResources).toString(); - SchemaResponse getSchemaResponse = controllerClient.retryableRequest( - DEFAULT_RETRY_TIMES, - c -> c.getValueOrDerivedSchemaId(storeName, partialUpdateSchema), - r -> r.getError().contains("Can not find any registered value schema nor derived schema")); - if (getSchemaResponse.isError()) { - if (getSchemaResponse.getError().contains("Can not find any registered value schema nor derived schema")) { - // The derived schema doesn't exist right now, try to register it. - SchemaResponse addDerivedSchemaResponse = controllerClient.retryableRequest( - DEFAULT_RETRY_TIMES, - c -> c.addDerivedSchema(storeName, valueSchemaId, partialUpdateSchema)); - if (addDerivedSchemaResponse.isError()) { - throw new VeniceException( - "Error when adding derived schema for value schema v" + valueSchemaId + " to system store " + storeName - + " in cluster " + clusterName + " after retries. Error: " + addDerivedSchemaResponse.getError()); - } + Schema valueSchemaInLocalResources, + List partialUpdateSchemasInZk) { + Schema partialUpdateSchema = + WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchemaInLocalResources); + String partialUpdateSchemaToFind = AvroCompatibilityHelper.toParsingForm(partialUpdateSchema); + for (DerivedSchemaEntry partialUpdateSchemaInZk: partialUpdateSchemasInZk) { + if (partialUpdateSchemaToFind.equals(partialUpdateSchemaInZk.getCanonicalSchemaStr())) { LOGGER.info( - "Added derived schema v{} for value schema v{} to system store {}.", - addDerivedSchemaResponse.getDerivedSchemaId(), - valueSchemaId, - storeName); - } else { - throw new VeniceException( - "Error when getting derived schema from system store " + storeName + " in cluster " + clusterName - + " after retries. Error: " + getSchemaResponse.getError()); + "Partial update schema in system store {} is already registered as version {}-{}.", + storeName, + partialUpdateSchemaInZk.getValueSchemaID(), + partialUpdateSchemaInZk.getId()); + return; } } + // Partial update schema doesn't exist right now, try to register it. + SchemaResponse addDerivedSchemaResponse = controllerClient.retryableRequest( + DEFAULT_RETRY_TIMES, + c -> c.addDerivedSchema(storeName, valueSchemaId, partialUpdateSchema.toString())); + if (addDerivedSchemaResponse.isError()) { + throw new VeniceException( + "Error when adding partial update schema for value schema v" + valueSchemaId + " to system store " + storeName + + " in cluster " + clusterName + " after retries. Error: " + addDerivedSchemaResponse.getError()); + } + LOGGER.info( + "Added partial update schema v{}-{} to system store {}.", + valueSchemaId, + addDerivedSchemaResponse.getDerivedSchemaId(), + storeName); } @Override diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java index 341b8bbfa2..b7e2f05010 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java @@ -17,22 +17,26 @@ import com.linkedin.venice.exceptions.ErrorType; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; +import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.utils.Utils; +import java.io.IOException; import java.util.Optional; +import org.apache.avro.Schema; import org.testng.Assert; import org.testng.annotations.Test; public class ControllerClientBackedSystemSchemaInitializerTest { @Test - public void testCreateSystemStoreAndRegisterSchema() { + public void testCreateSystemStoreAndRegisterSchema() throws IOException { try (ControllerClientBackedSystemSchemaInitializer initializer = new ControllerClientBackedSystemSchemaInitializer( AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE, "testCluster", null, null, - false, + true, Optional.empty(), "", "", @@ -50,7 +54,7 @@ public void testCreateSystemStoreAndRegisterSchema() { "testCluster", null, null, - false, + true, Optional.empty(), "", "d2Service", @@ -68,17 +72,33 @@ public void testCreateSystemStoreAndRegisterSchema() { NewStoreResponse newStoreResponse = mock(NewStoreResponse.class); doReturn(newStoreResponse).when(controllerClient).createNewSystemStore(any(), any(), any(), any()); MultiSchemaResponse multiSchemaResponse = mock(MultiSchemaResponse.class); - doReturn(new MultiSchemaResponse.Schema[0]).when(multiSchemaResponse).getSchemas(); - doReturn(multiSchemaResponse).when(controllerClient).getAllValueSchema(any()); - SchemaResponse schemaResponse = mock(SchemaResponse.class); - doReturn(schemaResponse).when(controllerClient).addValueSchema(any(), any(), anyInt(), any()); + MultiSchemaResponse.Schema[] schemas = new MultiSchemaResponse.Schema[2]; + Schema valueSchema = Utils.getSchemaFromResource("avro/StoreMetaValue/v1/StoreMetaValue.avsc"); + schemas[0] = new MultiSchemaResponse.Schema(); + schemas[0].setId(1); + schemas[0].setSchemaStr(valueSchema.toString()); + schemas[1] = new MultiSchemaResponse.Schema(); + schemas[1].setId(1); + schemas[1].setDerivedSchemaId(1); + schemas[1] + .setSchemaStr(WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema).toString()); + doReturn(schemas).when(multiSchemaResponse).getSchemas(); + doReturn(multiSchemaResponse).when(controllerClient).getAllValueAndDerivedSchema(any()); + doReturn(mock(SchemaResponse.class)).when(controllerClient).addValueSchema(any(), any(), anyInt(), any()); + doReturn(mock(SchemaResponse.class)).when(controllerClient).addDerivedSchema(any(), anyInt(), any()); doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any(), any()); doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any()); initializer.setControllerClient(controllerClient); initializer.execute(); verify(controllerClient, times(1)).createNewSystemStore(any(), any(), any(), any()); - verify(controllerClient, times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion())) - .addValueSchema(any(), any(), anyInt(), any()); + verify( + controllerClient, + times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion() - 1)) + .addValueSchema(any(), any(), anyInt(), any()); + verify( + controllerClient, + times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion() - 1)) + .addDerivedSchema(any(), anyInt(), any()); } verify(controllerClient, times(1)).close(); }