Skip to content

Commit

Permalink
[common][client] Fix metadata schema fetcher cache to use map instead…
Browse files Browse the repository at this point in the history
… of list (#1328)

* [common][client] Fix metadata schema fetcher cache to use map instead of list

It seems the current schema cache implementation is prone to bugs if there are gaps in the cache or if the number of elements goes to double digits.  This fixes it by using a map instead.
  • Loading branch information
ZacAttack authored Nov 20, 2024
1 parent 88c6ba5 commit 6fac562
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,36 @@
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import com.linkedin.venice.utils.SparseConcurrentList;


public class ReplicationMetadataSchemaRepository {
private final ControllerClient controllerClient;

private List<RmdSchemaEntry> cachedReplicationMetadataSchemas = new ArrayList<>();
private final SparseConcurrentList<RmdSchemaEntry> cachedReplicationMetadataSchemas = new SparseConcurrentList<>();

public ReplicationMetadataSchemaRepository(ControllerClient controllerClient) {
this.controllerClient = controllerClient;
}

public RmdSchemaEntry getReplicationMetadataSchemaById(String storeName, int replicationMetadataSchemaId) {
if (cachedReplicationMetadataSchemas.size() < replicationMetadataSchemaId) {
if (cachedReplicationMetadataSchemas.get(replicationMetadataSchemaId) == null) {
MultiSchemaResponse multiReplicationSchemaResponse = controllerClient.getAllReplicationMetadataSchemas(storeName);
if (multiReplicationSchemaResponse.isError()) {
throw new VeniceException(
"Failed to get store replication info for store: " + storeName + " with error: "
+ multiReplicationSchemaResponse.getError());
}
cachedReplicationMetadataSchemas = Arrays.stream(multiReplicationSchemaResponse.getSchemas())
.map(schema -> new RmdSchemaEntry(schema.getRmdValueSchemaId(), schema.getId(), schema.getSchemaStr()))
.collect(Collectors.toList());
if (cachedReplicationMetadataSchemas.size() < replicationMetadataSchemaId) {

for (MultiSchemaResponse.Schema schema: multiReplicationSchemaResponse.getSchemas()) {
cachedReplicationMetadataSchemas.computeIfAbsent(
schema.getRmdValueSchemaId(),
key -> new RmdSchemaEntry(schema.getRmdValueSchemaId(), schema.getId(), schema.getSchemaStr()));
}
if (cachedReplicationMetadataSchemas.get(replicationMetadataSchemaId) == null) {
throw new VeniceException("No available store replication metadata schema for store: " + storeName);
}
}
return cachedReplicationMetadataSchemas.get(replicationMetadataSchemaId - 1);
return cachedReplicationMetadataSchemas.get(replicationMetadataSchemaId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
doReturn(storeResponse).when(d2ControllerClient).getStore(storeName);
MultiSchemaResponse multiRMDSchemaResponse = mock(MultiSchemaResponse.class);
MultiSchemaResponse.Schema rmdSchemaFromMultiSchemaResponse = mock(MultiSchemaResponse.Schema.class);
doReturn(1).when(rmdSchemaFromMultiSchemaResponse).getRmdValueSchemaId();
doReturn(1).when(rmdSchemaFromMultiSchemaResponse).getId();
doReturn(rmdSchema.toString()).when(rmdSchemaFromMultiSchemaResponse).getSchemaStr();
doReturn(new MultiSchemaResponse.Schema[] { rmdSchemaFromMultiSchemaResponse }).when(multiRMDSchemaResponse)
.getSchemas();
Expand Down Expand Up @@ -343,6 +345,8 @@ public void testConsumeAfterImageWithCompaction() throws ExecutionException, Int
doReturn(storeResponse).when(d2ControllerClient).getStore(storeName);
MultiSchemaResponse multiRMDSchemaResponse = mock(MultiSchemaResponse.class);
MultiSchemaResponse.Schema rmdSchemaFromMultiSchemaResponse = mock(MultiSchemaResponse.Schema.class);
doReturn(1).when(rmdSchemaFromMultiSchemaResponse).getRmdValueSchemaId();
doReturn(1).when(rmdSchemaFromMultiSchemaResponse).getId();
doReturn(rmdSchema.toString()).when(rmdSchemaFromMultiSchemaResponse).getSchemaStr();
doReturn(new MultiSchemaResponse.Schema[] { rmdSchemaFromMultiSchemaResponse }).when(multiRMDSchemaResponse)
.getSchemas();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,17 @@
import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingDeleteRecord;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecordWithLogicalTimestamp;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V10_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V11_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V1_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V2_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V4_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V5_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V6_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V7_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V8_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V9_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
Expand Down Expand Up @@ -106,6 +115,22 @@ public class TestChangelogConsumer {
private VeniceClusterWrapper clusterWrapper;
private ControllerClient parentControllerClient;

List<Schema> SCHEMA_HISTORY = new ArrayList<Schema>() {
{
add(NAME_RECORD_V1_SCHEMA);
add(NAME_RECORD_V2_SCHEMA);
add(NAME_RECORD_V3_SCHEMA);
add(NAME_RECORD_V4_SCHEMA);
add(NAME_RECORD_V5_SCHEMA);
add(NAME_RECORD_V6_SCHEMA);
add(NAME_RECORD_V7_SCHEMA);
add(NAME_RECORD_V8_SCHEMA);
add(NAME_RECORD_V9_SCHEMA);
add(NAME_RECORD_V10_SCHEMA);
add(NAME_RECORD_V11_SCHEMA);
}
};

protected boolean isAAWCParallelProcessingEnabled() {
return false;
}
Expand Down Expand Up @@ -661,6 +686,7 @@ public void testSpecificRecordBootstrappingVeniceChangelogConsumer() throws Exce
setupControllerClient.retryableRequest(
5,
controllerClient1 -> setupControllerClient.addValueSchema(storeName, NAME_RECORD_V1_SCHEMA.toString()));

TestWriteUtils.runPushJob("Run push job", props);

TestMockTime testMockTime = new TestMockTime();
Expand Down Expand Up @@ -771,9 +797,12 @@ public void testSpecificRecordVeniceChangelogConsumer() throws Exception {
setupControllerClient
.retryableRequest(5, controllerClient1 -> setupControllerClient.updateStore(storeName, storeParms));
// Registering real data schema as schema v2.
setupControllerClient.retryableRequest(
5,
controllerClient1 -> setupControllerClient.addValueSchema(storeName, NAME_RECORD_V1_SCHEMA.toString()));

for (Schema schema: SCHEMA_HISTORY) {
setupControllerClient
.retryableRequest(5, controllerClient1 -> setupControllerClient.addValueSchema(storeName, schema.toString()));
}

TestWriteUtils.runPushJob("Run push job", props);
TestMockTime testMockTime = new TestMockTime();
ZkServerWrapper localZkServer = multiRegionMultiClusterWrapper.getChildRegions().get(0).getZkServerWrapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ public void testTransitionToHAASControllerAsStorageClusterLeader() {
controllerClient
.sendEmptyPushAndWait(response.getName(), Utils.getUniqueString(), 100, 30 * Time.MS_PER_MINUTE)));
List<VeniceControllerWrapper> oldControllers = venice.getVeniceControllers();
List<VeniceControllerWrapper> newControllers = new ArrayList<>();
LiveInstance clusterLeader = helixAsAServiceWrapper.getClusterLeader(venice.getClusterName());
assertNotNull(clusterLeader, "Could not find the cluster leader from HAAS!");
assertFalse(
Expand All @@ -232,7 +231,7 @@ public void testTransitionToHAASControllerAsStorageClusterLeader() {
for (VeniceControllerWrapper oldController: oldControllers) {
venice.stopVeniceController(oldController.getPort());
oldController.close();
newControllers.add(venice.addVeniceController(enableControllerAndStorageClusterHAASProperties));
venice.addVeniceController(enableControllerAndStorageClusterHAASProperties);
}

waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ public void setUp() {
DaVinciClient<Integer, Object> client = factory.getAndStartGenericAvroClient(storeName, daVinciConfig);
client.subscribeAll().get();
clients.add(client);
// This is a very dumb and basic assertion that's only purpose is to get static analysis to not be mad
Assert.assertTrue(clients.get(0).getPartitionCount() > 0);
} catch (ExecutionException | InterruptedException e) {
throw new VeniceException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ public class TestWriteUtils {
public static final Schema NAME_RECORD_V6_SCHEMA =
AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV6.avsc"));

public static final Schema NAME_RECORD_V7_SCHEMA =
AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV7.avsc"));
public static final Schema NAME_RECORD_V8_SCHEMA =
AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV8.avsc"));
public static final Schema NAME_RECORD_V9_SCHEMA =
AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV9.avsc"));
public static final Schema NAME_RECORD_V10_SCHEMA =
AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV10.avsc"));

public static final Schema NAME_RECORD_V11_SCHEMA =
AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/NameV11.avsc"));

public static final Schema UNION_RECORD_V1_SCHEMA =
AvroCompatibilityHelper.parse(loadSchemaFileFromResource("valueSchema/UnionV1.avsc"));
public static final Schema UNION_RECORD_V2_SCHEMA =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"type" : "record",
"name" : "nameRecord",
"namespace" : "example.avro",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "lastName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "age",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "is",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "just",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "a",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "number",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"type" : "record",
"name" : "nameRecord",
"namespace" : "example.avro",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "lastName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "age",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "is",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "just",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "a",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "number",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "bro",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"type" : "record",
"name" : "nameRecord",
"namespace" : "example.avro",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "lastName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "age",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "is",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"type" : "record",
"name" : "nameRecord",
"namespace" : "example.avro",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "lastName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "age",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "is",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "just",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"type" : "record",
"name" : "nameRecord",
"namespace" : "example.avro",
"fields" : [ {
"name" : "firstName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "lastName",
"type" : "string",
"default" : "",
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}, {
"name" : "age",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "is",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "just",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
},
{
"name" : "a",
"type" : "int",
"default" : -1,
"custom_prop" : "custom_prop_value_2, custom_prop_value_1"
}]
}

0 comments on commit 6fac562

Please sign in to comment.