Skip to content

Commit

Permalink
[fast-client] Wrong value schema id used when handling multi-get resp…
Browse files Browse the repository at this point in the history
…onse in FC (#792)

* [fast-client] Wrong value schema id used when handling multi-get response in FC

batchGetTransportRequestCompletionHandler should obtain the correct dataRecordDeserializer
by using the MultiGetResponseRecordV1's per record value schema id field. The handler was
using the transport response header's schema id which is inteded for the protocol version
and not the write schema version for the data records. As a result, FC is running into data
issues when the user record's write schema id > 1 with new fields/data.
  • Loading branch information
xunyin8 authored Dec 8, 2023
1 parent 8541dc4 commit f21b8f8
Show file tree
Hide file tree
Showing 20 changed files with 287 additions and 53 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,8 @@ ext.createDiffFile = { ->
':!clients/venice-client/src/main/java/com/linkedin/venice/fastclient/factory/ClientFactory.java',
// unit test for gRPC Transport Client is not straightforward, adding to exclusion list for now
':!clients/venice-client/src/main/java/com/linkedin/venice/fastclient/transport/GrpcTransportClient.java',
// unit test for deprecated DispatchingVsonStoreClient is not meaningful since most logic is in its parent class
':!clients/venice-client/src/main/java/com/linkedin/venice/fastclient/DispatchingVsonStoreClient.java',

// venice-producer
':!clients/venice-producer/src/main/java/com/linkedin/venice/producer/online/OnlineProducerFactory.java',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ConcurrentRef;
import com.linkedin.venice.utils.ReferenceCounted;
Expand Down Expand Up @@ -362,7 +363,7 @@ private void swapCurrentVersion() {
setDaVinciCurrentVersion(version);
}

public AvroStoreDeserializerCache getStoreDeserializerCache() {
public StoreDeserializerCache getStoreDeserializerCache() {
return storeDeserializerCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ExceptionUtils;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class VersionBackend {
private final Map<Integer, CompletableFuture<Void>> partitionFutures = new VeniceConcurrentHashMap<>();
private final int stopConsumptionTimeoutInSeconds;
private final StoreBackendStats storeBackendStats;
private final AvroStoreDeserializerCache storeDeserializerCache;
private final StoreDeserializerCache storeDeserializerCache;
private final Lazy<VeniceCompressor> compressor;

/*
Expand Down Expand Up @@ -184,7 +185,7 @@ public <V> V read(
int userPartition,
byte[] keyBytes,
AbstractAvroChunkingAdapter<V> chunkingAdaptor,
AvroStoreDeserializerCache<V> storeDeserializerCache,
StoreDeserializerCache<V> storeDeserializerCache,
int readerSchemaId,
BinaryDecoder binaryDecoder,
ByteBuffer reusableRawValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.serialization.avro.AvroSpecificStoreDeserializerCache;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
Expand Down Expand Up @@ -111,7 +113,7 @@ protected boolean removeEldestEntry(Map.Entry<Schema, GenericRecord> eldest) {
private RecordSerializer<K> keySerializer;
private RecordDeserializer<K> keyDeserializer;
private AvroStoreDeserializerCache<GenericRecord> genericRecordStoreDeserializerCache;
private AvroStoreDeserializerCache<V> storeDeserializerCache;
private StoreDeserializerCache<V> storeDeserializerCache;
private AvroGenericReadComputeStoreClient<K, V> veniceClient;
private StoreBackend storeBackend;
private static ReferenceCounted<DaVinciBackend> daVinciBackend;
Expand Down Expand Up @@ -741,7 +743,7 @@ public synchronized void start() {
this.genericRecordStoreDeserializerCache =
new AvroStoreDeserializerCache(daVinciBackend.get().getSchemaRepository(), getStoreName(), true);
this.storeDeserializerCache = clientConfig.isSpecificClient()
? new AvroStoreDeserializerCache<>(
? new AvroSpecificStoreDeserializerCache<>(
daVinciBackend.get().getSchemaRepository(),
getStoreName(),
clientConfig.getSpecificValueClass())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.RawBytesStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.AvroSpecificStoreDeserializerCache;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
Expand Down Expand Up @@ -81,7 +83,7 @@ public class VeniceChangelogConsumerImpl<K, V> implements VeniceChangelogConsume
protected final CompressorFactory compressorFactory = new CompressorFactory();

protected final HashMap<Integer, VeniceCompressor> compressorMap = new HashMap<>();
protected AvroStoreDeserializerCache<V> storeDeserializerCache;
protected StoreDeserializerCache<V> storeDeserializerCache;
private final AvroStoreDeserializerCache<RecordChangeEvent> recordChangeEventDeserializerCache;

protected ThinClientMetaStoreBasedRepository storeRepository;
Expand Down Expand Up @@ -157,7 +159,7 @@ public VeniceChangelogConsumerImpl(
// If a value class is supplied, we'll use a Specific record adapter
Class valueClass = changelogClientConfig.getInnerClientConfig().getSpecificValueClass();
this.userEventChunkingAdapter = new SpecificRecordChunkingAdapter();
this.storeDeserializerCache = new AvroStoreDeserializerCache<>(storeRepository, storeName, valueClass);
this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>(storeRepository, storeName, valueClass);
} else {
this.userEventChunkingAdapter = GenericChunkingAdapter.INSTANCE;
this.storeDeserializerCache = new AvroStoreDeserializerCache<>(storeRepository, storeName, true);
Expand Down Expand Up @@ -559,7 +561,7 @@ protected <T> T bufferAndAssembleRecordChangeEvent(
long recordOffset,
AbstractAvroChunkingAdapter<T> chunkingAdapter,
Lazy<RecordDeserializer<T>> recordDeserializer,
AvroStoreDeserializerCache<T> deserializerCache,
StoreDeserializerCache<T> deserializerCache,
int readerSchemaId) {
T assembledRecord = null;
// Select compressor. We'll only construct compressors for version topics so this will return null for
Expand Down Expand Up @@ -688,7 +690,7 @@ protected Optional<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> con
AbstractAvroChunkingAdapter chunkingAdapter;
int readerSchemaId;
ReadOnlySchemaRepository schemaRepo;
AvroStoreDeserializerCache deserializerCache;
StoreDeserializerCache deserializerCache;
if (pubSubTopicPartition.getPubSubTopic().isVersionTopic()) {
Schema valueSchema = schemaReader.getValueSchema(put.schemaId);
deserializerProvider =
Expand Down Expand Up @@ -901,7 +903,7 @@ protected void setStoreRepository(ThinClientMetaStoreBasedRepository repository)
if (changelogClientConfig.getInnerClientConfig().isSpecificClient()) {
// If a value class is supplied, we'll use a Specific record adapter
Class valueClass = changelogClientConfig.getInnerClientConfig().getSpecificValueClass();
this.storeDeserializerCache = new AvroStoreDeserializerCache<>(storeRepository, storeName, valueClass);
this.storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>(storeRepository, storeName, valueClass);
} else {
this.storeDeserializerCache = new AvroStoreDeserializerCache<>(storeRepository, storeName, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
Expand Down Expand Up @@ -228,7 +227,7 @@ public void getByPartialKey(
boolean isChunked,
ReadResponse response,
int readerSchemaId,
AvroStoreDeserializerCache<T> storeDeserializerCache,
StoreDeserializerCache<T> storeDeserializerCache,
VeniceCompressor compressor,
StreamingCallback<GenericRecord, GenericRecord> computingCallback) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.linkedin.venice.read.protocol.response.streaming.StreamingFooterRecordV1;
import com.linkedin.venice.router.exception.VeniceKeyCountLimitException;
import com.linkedin.venice.schema.avro.ReadAvroProtocolDefinition;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
Expand Down Expand Up @@ -73,7 +75,7 @@ public class DispatchingAvroGenericStoreClient<K, V> extends InternalAvroStoreCl
private final String BATCH_GET_TRANSPORT_EXCEPTION_FILTER_MESSAGE;
private final String COMPUTE_TRANSPORT_EXCEPTION_FILTER_MESSAGE;

private final StoreMetadata metadata;
protected final StoreMetadata metadata;
private final int requiredReplicaCount;

private final ClientConfig config;
Expand All @@ -82,6 +84,8 @@ public class DispatchingAvroGenericStoreClient<K, V> extends InternalAvroStoreCl

// Key serializer
private RecordSerializer<K> keySerializer;
protected StoreDeserializerCache<V> storeDeserializerCache;

private static final RecordSerializer<MultiGetRouterRequestKeyV1> MULTI_GET_REQUEST_SERIALIZER =
FastSerializerDeserializerFactory.getAvroGenericSerializer(MultiGetRouterRequestKeyV1.SCHEMA$);
private static final RecordSerializer<ComputeRouterRequestKeyV1> COMPUTE_REQUEST_SERIALIZER =
Expand Down Expand Up @@ -128,6 +132,7 @@ public DispatchingAvroGenericStoreClient(
String storeName = metadata.getStoreName();
BATCH_GET_TRANSPORT_EXCEPTION_FILTER_MESSAGE = "BatchGet Transport Exception for " + storeName;
COMPUTE_TRANSPORT_EXCEPTION_FILTER_MESSAGE = "Compute Transport Exception for " + storeName;
this.storeDeserializerCache = new AvroStoreDeserializerCache<>(metadata);
}

protected StoreMetadata getStoreMetadata() {
Expand Down Expand Up @@ -562,7 +567,6 @@ private void batchGetTransportRequestCompletionHandler(
requestContext.recordRequestDeserializationTime(
transportClientResponse.getRouteId(),
getLatencyInNS(nanoTsBeforeRequestDeserialization));
RecordDeserializer<V> dataRecordDeserializer = getDataRecordDeserializer(transportClientResponse.getSchemaId());

List<MultiKeyRequestContext.KeyInfo<K>> keyInfos =
requestContext.keysForRoutes(transportClientResponse.getRouteId());
Expand All @@ -573,14 +577,16 @@ private void batchGetTransportRequestCompletionHandler(
metadata.getCompressor(transportClientResponse.getCompressionStrategy(), requestContext.currentVersion);
for (MultiGetResponseRecordV1 r: records) {
long nanoTsBeforeDecompression = System.nanoTime();

ByteBuffer decompressRecord = decompressRecord(
transportClientResponse.getCompressionStrategy(),
r.value,
requestContext.currentVersion,
compressor);
totalDecompressionTimeForResponse += System.nanoTime() - nanoTsBeforeDecompression;

long nanoTsBeforeDeserialization = System.nanoTime();
totalDecompressionTimeForResponse += nanoTsBeforeDeserialization - nanoTsBeforeDecompression;
RecordDeserializer<V> dataRecordDeserializer = getDataRecordDeserializer(r.getSchemaId());
V deserializedValue = dataRecordDeserializer.deserialize(decompressRecord);
requestContext.recordRecordDeserializationTime(
transportClientResponse.getRouteId(),
Expand Down Expand Up @@ -760,26 +766,13 @@ protected RecordDeserializer<MultiGetResponseRecordV1> getMultiGetResponseRecord
}

protected RecordDeserializer<V> getDataRecordDeserializer(int schemaId) throws VeniceClientException {
Schema readerSchema = metadata.getLatestValueSchema();
if (readerSchema == null) {
throw new VeniceClientException("Failed to get latest value schema for store: " + metadata.getStoreName());
}
Schema writerSchema = metadata.getValueSchema(schemaId);
if (writerSchema == null) {
throw new VeniceClientException(
"Failed to get writer schema with id: " + schemaId + " from store: " + metadata.getStoreName());
}
return getValueDeserializer(writerSchema, readerSchema);
return storeDeserializerCache.getDeserializer(schemaId, metadata.getLatestValueSchemaId());
}

private RecordDeserializer<GenericRecord> getComputeResultRecordDeserializer(Schema resultSchema) {
return FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(resultSchema, resultSchema);
}

protected RecordDeserializer<V> getValueDeserializer(Schema writerSchema, Schema readerSchema) {
return FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(writerSchema, readerSchema);
}

private <T> T tryToDeserialize(RecordDeserializer<T> dataDeserializer, ByteBuffer data, int writerSchemaId, K key) {
return AbstractAvroStoreClient.tryToDeserializeWithVerboseLogging(
dataDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.fastclient.meta.StoreMetadata;
import com.linkedin.venice.serialization.avro.AvroSpecificStoreDeserializerCache;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;


Expand All @@ -20,17 +20,12 @@ public DispatchingAvroSpecificStoreClient(StoreMetadata metadata, ClientConfig c
"SpecificValueClass in ClientConfig shouldn't be null when constructing a specific store client.");
}
this.valueClass = config.getSpecificValueClass();

FastSerializerDeserializerFactory.verifyWhetherFastSpecificDeserializerWorks(this.valueClass);
storeDeserializerCache = new AvroSpecificStoreDeserializerCache<>(metadata, valueClass);
}

@Override
protected RecordDeserializer<V> getDataRecordDeserializer(int schemaId) throws VeniceClientException {
Schema writerSchema = getStoreMetadata().getValueSchema(schemaId);
if (writerSchema == null) {
throw new VeniceClientException(
"Failed to get value schema for store: " + getStoreName() + " and id: " + schemaId);
}
return FastSerializerDeserializerFactory.getFastAvroSpecificDeserializer(writerSchema, valueClass);
return storeDeserializerCache.getDeserializer(schemaId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.fastclient;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.fastclient.meta.StoreMetadata;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
Expand All @@ -22,7 +23,16 @@ protected RecordSerializer getKeySerializer(Schema keySchema) {
}

@Override
protected RecordDeserializer<V> getValueDeserializer(Schema writerSchema, Schema readerSchema) {
protected RecordDeserializer<V> getDataRecordDeserializer(int schemaId) {
Schema readerSchema = metadata.getLatestValueSchema();
if (readerSchema == null) {
throw new VeniceClientException("Failed to get latest value schema for store: " + metadata.getStoreName());
}
Schema writerSchema = metadata.getValueSchema(schemaId);
if (writerSchema == null) {
throw new VeniceClientException(
"Failed to get writer schema with id: " + schemaId + " from store: " + metadata.getStoreName());
}
return SerializerDeserializerFactory.getVsonDeserializer(writerSchema, readerSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void testSimpleStreamingBatchGet() throws InterruptedException, Execution

TestClientSimulator client = new TestClientSimulator();
client.generateKeyValues(0, 1000)
.setExpectedValueSchemaId(5)
.partitionKeys(1)
.assignRouteToPartitions("https://host1.linkedin.com", 0)
.expectRequestWithKeysForPartitionOnRoute(1, 1, "https://host1.linkedin.com", 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ private byte[] serializeBatchGetResponse(Set<String> Keys) {
routerRequestValue.setValue(valueByteBuffer);
routerRequestValue.keyIndex = count.getAndIncrement();
routerRequestValues.add(routerRequestValue);
routerRequestValue.setSchemaId(1);
});
return MULTI_GET_RESPONSE_SERIALIZER.serializeObjects(routerRequestValues);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.fastclient.ClientConfig;
import com.linkedin.venice.fastclient.factory.ClientFactory;
import com.linkedin.venice.fastclient.meta.AbstractClientRoutingStrategy;
Expand Down Expand Up @@ -104,6 +105,8 @@ public ClientConfig getClientConfig() {
private boolean longTailRetryEnabledForBatchGet = false;
private int longTailRetryThresholdForBatchGetInMicroSeconds = 0;

private int expectedValueSchemaId = 1;

private static class UnitTestRoutingStrategy extends AbstractClientRoutingStrategy {
@Override
public List<String> getReplicas(long requestId, List<String> replicas, int requiredReplicaCount) {
Expand Down Expand Up @@ -204,6 +207,11 @@ public TestClientSimulator respondToRequestWithError(int timeTick, int requestId
return this;
}

public TestClientSimulator setExpectedValueSchemaId(int id) {
expectedValueSchemaId = id;
return this;
}

@Override
public Future<RestResponse> restRequest(RestRequest request) {
throw new IllegalStateException("Unexpected rest request");
Expand Down Expand Up @@ -396,7 +404,7 @@ public CompletableFuture<Integer> execute() {
MultiGetResponseRecordV1 rec = new MultiGetResponseRecordV1();
rec.value = ByteBuffer.wrap(keySerializer.serialize(info.keyValues.get(info.orderedKeys.get(i))));
rec.keyIndex = i;
rec.schemaId = 1;
rec.schemaId = expectedValueSchemaId;
multiGetResponse.add(rec);
}
RestResponseBuilder restResponseBuilder = new RestResponseBuilder();
Expand Down Expand Up @@ -585,7 +593,12 @@ public Schema getKeySchema() {

@Override
public Schema getValueSchema(int id) {
return KEY_VALUE_SCHEMA;
if (id == expectedValueSchemaId) {
return KEY_VALUE_SCHEMA;
} else {
throw new VeniceException(
"Unexpected get schema call with id: " + id + ", expecting value schema id: " + expectedValueSchemaId);
}
}

@Override
Expand All @@ -600,7 +613,7 @@ public Schema getLatestValueSchema() {

@Override
public Integer getLatestValueSchemaId() {
return 0;
return expectedValueSchemaId;
}

@Override
Expand Down
Loading

0 comments on commit f21b8f8

Please sign in to comment.