diff --git a/build.gradle b/build.gradle index f9d9660e40..06cbe80a74 100644 --- a/build.gradle +++ b/build.gradle @@ -38,7 +38,7 @@ if (project.hasProperty('overrideBuildEnvironment')) { } def avroVersion = '1.10.2' -def avroUtilVersion = '0.3.19' +def avroUtilVersion = '0.3.21' def grpcVersion = '1.49.2' def kafkaGroup = 'com.linkedin.kafka' def kafkaVersion = '2.4.1.65' diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java index 9b3bab9150..068517615f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java @@ -135,13 +135,17 @@ public ActiveActiveStoreIngestionTask( StringAnnotatedStoreSchemaCache annotatedReadOnlySchemaRepository = new StringAnnotatedStoreSchemaCache(storeName, schemaRepository); - this.rmdSerDe = new RmdSerDe(annotatedReadOnlySchemaRepository, rmdProtocolVersionId); + this.rmdSerDe = new RmdSerDe( + annotatedReadOnlySchemaRepository, + rmdProtocolVersionId, + getServerConfig().isComputeFastAvroEnabled()); this.mergeConflictResolver = MergeConflictResolverFactory.getInstance() .createMergeConflictResolver( annotatedReadOnlySchemaRepository, rmdSerDe, getStoreName(), - isWriteComputationEnabled); + isWriteComputationEnabled, + getServerConfig().isComputeFastAvroEnabled()); this.remoteIngestionRepairService = builder.getRemoteIngestionRepairService(); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java index 1fee1e32d7..cb3de47ab3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreWriteComputeProcessor.java @@ -3,7 +3,7 @@ import com.linkedin.davinci.schema.merge.MergeRecordHelper; import com.linkedin.davinci.schema.writecompute.WriteComputeProcessor; import com.linkedin.davinci.schema.writecompute.WriteComputeSchemaValidator; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.serializer.AvroSerializer; @@ -132,7 +132,7 @@ RecordDeserializer getValueDeserializer(Schema writerSchema, Sche // Map in write compute needs to have consistent ordering. On the sender side, users may not care about ordering // in their maps. However, on the receiver side, we still want to make sure that the same serialized map bytes // always get deserialized into maps with the same entry ordering. - return MapOrderingPreservingSerDeFactory.getDeserializer(writerSchema, readerSchema); + return MapOrderPreservingSerDeFactory.getDeserializer(writerSchema, readerSchema); } private RecordSerializer getValueSerializer(int valueSchemaId) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java index dfc4ca83e5..c4ebd09d99 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolver.java @@ -11,7 +11,8 @@ import com.linkedin.davinci.replication.RmdWithValueSchemaId; import com.linkedin.davinci.schema.merge.ValueAndRmd; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.fast.MapOrderPreservingFastSerDeFactory; import com.linkedin.davinci.store.record.ValueRecord; import com.linkedin.venice.annotation.Threadsafe; import com.linkedin.venice.exceptions.VeniceException; @@ -20,7 +21,11 @@ import com.linkedin.venice.schema.rmd.RmdTimestampType; import com.linkedin.venice.schema.rmd.RmdUtils; import com.linkedin.venice.schema.writecompute.WriteComputeOperation; +import com.linkedin.venice.serializer.RecordDeserializer; +import com.linkedin.venice.serializer.RecordSerializer; import com.linkedin.venice.utils.AvroSchemaUtils; +import com.linkedin.venice.utils.SparseConcurrentList; +import com.linkedin.venice.utils.collections.BiIntKeyCache; import com.linkedin.venice.utils.lazy.Lazy; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -52,6 +57,11 @@ public class MergeConflictResolver { private final MergeResultValueSchemaResolver mergeResultValueSchemaResolver; private final RmdSerDe rmdSerde; private final boolean useFieldLevelTimestamp; + private final boolean fastAvroEnabled; + + private final SparseConcurrentList> serializerIndexedByValueSchemaId; + private final BiIntKeyCache> deserializerCacheForFullValue; + private final BiIntKeyCache>> deserializerCacheForUpdateValue; MergeConflictResolver( StringAnnotatedStoreSchemaCache storeSchemaCache, @@ -61,7 +71,8 @@ public class MergeConflictResolver { MergeByteBuffer mergeByteBuffer, MergeResultValueSchemaResolver mergeResultValueSchemaResolver, RmdSerDe rmdSerde, - boolean useFieldLevelTimestamp) { + boolean useFieldLevelTimestamp, + boolean fastAvroEnabled) { this.storeSchemaCache = Validate.notNull(storeSchemaCache); this.storeName = Validate.notNull(storeName); this.newRmdCreator = Validate.notNull(newRmdCreator); @@ -70,6 +81,18 @@ public class MergeConflictResolver { this.mergeByteBuffer = Validate.notNull(mergeByteBuffer); this.rmdSerde = Validate.notNull(rmdSerde); this.useFieldLevelTimestamp = useFieldLevelTimestamp; + this.fastAvroEnabled = fastAvroEnabled; + + this.serializerIndexedByValueSchemaId = new SparseConcurrentList<>(); + this.deserializerCacheForFullValue = new BiIntKeyCache<>((writerSchemaId, readerSchemaId) -> { + Schema writerSchema = getValueSchema(writerSchemaId); + Schema readerSchema = getValueSchema(readerSchemaId); + return this.fastAvroEnabled + ? MapOrderPreservingFastSerDeFactory.getDeserializer(writerSchema, readerSchema) + : MapOrderPreservingSerDeFactory.getDeserializer(writerSchema, readerSchema); + }); + this.deserializerCacheForUpdateValue = + new BiIntKeyCache<>((writerSchemaId, readerSchemaId) -> new SparseConcurrentList<>()); } /** @@ -247,7 +270,7 @@ public MergeConflictResult update( } final ByteBuffer updatedValueBytes = updatedValueAndRmd.getValue() == null ? null - : serializeMergedValueRecord(oldValueSchema, updatedValueAndRmd.getValue()); + : serializeMergedValueRecord(oldValueSchemaID, updatedValueAndRmd.getValue()); return new MergeConflictResult(updatedValueBytes, oldValueSchemaID, false, updatedValueAndRmd.getRmd()); } @@ -300,14 +323,13 @@ private MergeConflictResult mergePutWithFieldLevelTimestamp( } final SchemaEntry mergeResultValueSchemaEntry = mergeResultValueSchemaResolver.getMergeResultValueSchema(oldValueSchemaID, newValueSchemaID); - final Schema mergeResultValueSchema = mergeResultValueSchemaEntry.getSchema(); - final Schema newValueWriterSchema = getValueSchema(newValueSchemaID); /** * Note that it is important that the new value record should NOT use {@link mergeResultValueSchema}. * {@link newValueWriterSchema} is either the same as {@link mergeResultValueSchema} or it is a subset of * {@link mergeResultValueSchema}. */ - GenericRecord newValueRecord = deserializeValue(newValueBytes, newValueWriterSchema, newValueWriterSchema); + GenericRecord newValueRecord = + deserializerCacheForFullValue.get(newValueSchemaID, newValueSchemaID).deserialize(newValueBytes); ValueAndRmd oldValueAndRmd = createOldValueAndRmd( mergeResultValueSchemaEntry.getSchema(), mergeResultValueSchemaEntry.getId(), @@ -325,7 +347,8 @@ private MergeConflictResult mergePutWithFieldLevelTimestamp( if (mergedValueAndRmd.isUpdateIgnored()) { return MergeConflictResult.getIgnoredResult(); } - ByteBuffer mergedValueBytes = serializeMergedValueRecord(mergeResultValueSchema, mergedValueAndRmd.getValue()); + ByteBuffer mergedValueBytes = + serializeMergedValueRecord(mergeResultValueSchemaEntry.getId(), mergedValueAndRmd.getValue()); return new MergeConflictResult(mergedValueBytes, newValueSchemaID, false, mergedValueAndRmd.getRmd()); } @@ -380,7 +403,7 @@ private MergeConflictResult mergeDeleteWithFieldLevelTimestamp( } final ByteBuffer mergedValueBytes = mergedValueAndRmd.getValue() == null ? null - : serializeMergedValueRecord(oldValueSchema, mergedValueAndRmd.getValue()); + : serializeMergedValueRecord(oldValueSchemaID, mergedValueAndRmd.getValue()); return new MergeConflictResult(mergedValueBytes, oldValueSchemaID, false, mergedValueAndRmd.getRmd()); } @@ -402,8 +425,11 @@ private ValueAndRmd createOldValueAndRmd( int oldValueWriterSchemaID, Lazy oldValueBytesProvider, GenericRecord oldRmdRecord) { - final GenericRecord oldValueRecord = - createValueRecordFromByteBuffer(readerValueSchema, oldValueWriterSchemaID, oldValueBytesProvider.get()); + final GenericRecord oldValueRecord = createValueRecordFromByteBuffer( + readerValueSchema, + readerValueSchemaID, + oldValueWriterSchemaID, + oldValueBytesProvider.get()); // RMD record should contain a per-field timestamp and it should use the RMD schema generated from // mergeResultValueSchema. @@ -418,13 +444,13 @@ private ValueAndRmd createOldValueAndRmd( private GenericRecord createValueRecordFromByteBuffer( Schema readerValueSchema, + int readerValueSchemaID, int oldValueWriterSchemaID, ByteBuffer oldValueBytes) { if (oldValueBytes == null) { return AvroSchemaUtils.createGenericRecord(readerValueSchema); } - final Schema oldValueWriterSchema = getValueSchema(oldValueWriterSchemaID); - return deserializeValue(oldValueBytes, oldValueWriterSchema, readerValueSchema); + return deserializerCacheForFullValue.get(oldValueWriterSchemaID, readerValueSchemaID).deserialize(oldValueBytes); } private GenericRecord convertRmdToUseReaderValueSchema( @@ -439,13 +465,6 @@ private GenericRecord convertRmdToUseReaderValueSchema( return rmdSerde.deserializeRmdBytes(writerValueSchemaID, readerValueSchemaID, rmdBytes); } - private GenericRecord deserializeValue(ByteBuffer bytes, Schema writerSchema, Schema readerSchema) { - /** - * TODO: Refactor this to use {@link com.linkedin.venice.serialization.StoreDeserializerCache} - */ - return MapOrderingPreservingSerDeFactory.getDeserializer(writerSchema, readerSchema).deserialize(bytes); - } - private boolean ignoreNewPut( final int oldValueSchemaID, GenericRecord oldValueFieldTimestampsRecord, @@ -582,9 +601,17 @@ private GenericRecord deserializeWriteComputeBytes( int readerValueSchemaId, int updateProtocolVersion, ByteBuffer updateBytes) { - Schema writerSchema = getWriteComputeSchema(writerValueSchemaId, updateProtocolVersion); - Schema readerSchema = getWriteComputeSchema(readerValueSchemaId, updateProtocolVersion); - return deserializeValue(updateBytes, writerSchema, readerSchema); + RecordDeserializer deserializer = + deserializerCacheForUpdateValue.get(writerValueSchemaId, readerValueSchemaId) + .computeIfAbsent(updateProtocolVersion, ignored -> { + Schema writerSchema = getWriteComputeSchema(writerValueSchemaId, updateProtocolVersion); + Schema readerSchema = getWriteComputeSchema(readerValueSchemaId, updateProtocolVersion); + return this.fastAvroEnabled + ? MapOrderPreservingFastSerDeFactory.getDeserializer(writerSchema, readerSchema) + : MapOrderPreservingSerDeFactory.getDeserializer(writerSchema, readerSchema); + }); + + return deserializer.deserialize(updateBytes); } private ValueAndRmd prepareValueAndRmdForUpdate( @@ -603,8 +630,8 @@ private ValueAndRmd prepareValueAndRmdForUpdate( * case, the value must be retrieved from storage engine, and is prepended with schema ID. */ int schemaId = ValueRecord.parseSchemaId(oldValueBytes.array()); - Schema writerSchema = getValueSchema(schemaId); - newValue = deserializeValue(oldValueBytes, writerSchema, readerValueSchemaEntry.getSchema()); + newValue = + deserializerCacheForFullValue.get(schemaId, readerValueSchemaEntry.getId()).deserialize(oldValueBytes); } GenericRecord newRmd = newRmdCreator.apply(readerValueSchemaEntry.getId()); newRmd.put(TIMESTAMP_FIELD_POS, createPerFieldTimestampRecord(newRmd.getSchema(), 0L, newValue)); @@ -741,10 +768,16 @@ private boolean ignoreNewUpdate( } } - private ByteBuffer serializeMergedValueRecord(Schema mergedValueSchema, GenericRecord mergedValue) { + private ByteBuffer serializeMergedValueRecord(int mergedValueSchemaId, GenericRecord mergedValue) { // TODO: avoid serializing the merged value result here and instead serializing it before persisting it. The goal // is to avoid back-and-forth ser/de. Because when the merged result is read before it is persisted, we may need // to deserialize it. - return ByteBuffer.wrap(MapOrderingPreservingSerDeFactory.getSerializer(mergedValueSchema).serialize(mergedValue)); + RecordSerializer serializer = serializerIndexedByValueSchemaId.computeIfAbsent(mergedValueSchemaId, ignored -> { + Schema mergedValueSchema = getValueSchema(mergedValueSchemaId); + return fastAvroEnabled + ? MapOrderPreservingFastSerDeFactory.getSerializer(mergedValueSchema) + : MapOrderPreservingSerDeFactory.getSerializer(mergedValueSchema); + }); + return ByteBuffer.wrap(serializer.serialize(mergedValue)); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolverFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolverFactory.java index 0023815722..bb1c333328 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolverFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/MergeConflictResolverFactory.java @@ -21,7 +21,8 @@ public MergeConflictResolver createMergeConflictResolver( StringAnnotatedStoreSchemaCache annotatedReadOnlySchemaRepository, RmdSerDe rmdSerDe, String storeName, - boolean rmdUseFieldLevelTs) { + boolean rmdUseFieldLevelTs, + boolean fastAvroEnabled) { MergeRecordHelper mergeRecordHelper = new CollectionTimestampMergeRecordHelper(); return new MergeConflictResolver( annotatedReadOnlySchemaRepository, @@ -31,13 +32,14 @@ public MergeConflictResolver createMergeConflictResolver( new MergeByteBuffer(), new MergeResultValueSchemaResolverImpl(annotatedReadOnlySchemaRepository, storeName), rmdSerDe, - rmdUseFieldLevelTs); + rmdUseFieldLevelTs, + fastAvroEnabled); } public MergeConflictResolver createMergeConflictResolver( StringAnnotatedStoreSchemaCache annotatedReadOnlySchemaRepository, RmdSerDe rmdSerDe, String storeName) { - return createMergeConflictResolver(annotatedReadOnlySchemaRepository, rmdSerDe, storeName, false); + return createMergeConflictResolver(annotatedReadOnlySchemaRepository, rmdSerDe, storeName, false, true); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java index e22a25f1ea..c82e2d8656 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/replication/merge/RmdSerDe.java @@ -1,7 +1,8 @@ package com.linkedin.davinci.replication.merge; import com.linkedin.davinci.replication.RmdWithValueSchemaId; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.fast.MapOrderPreservingFastSerDeFactory; import com.linkedin.venice.annotation.Threadsafe; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.schema.rmd.RmdSchemaEntry; @@ -30,16 +31,27 @@ public class RmdSerDe { private final SparseConcurrentList rmdSchemaIndexedByValueSchemaId; private final SparseConcurrentList> rmdSerializerIndexedByValueSchemaId; private final BiIntKeyCache> deserializerCache; + private final boolean fastAvroEnabled; public RmdSerDe(StringAnnotatedStoreSchemaCache annotatedStoreSchemaCache, int rmdVersionId) { + this(annotatedStoreSchemaCache, rmdVersionId, true); + } + + public RmdSerDe( + StringAnnotatedStoreSchemaCache annotatedStoreSchemaCache, + int rmdVersionId, + boolean fastAvroEnabled) { this.annotatedStoreSchemaCache = annotatedStoreSchemaCache; this.rmdVersionId = rmdVersionId; this.rmdSchemaIndexedByValueSchemaId = new SparseConcurrentList<>(); this.rmdSerializerIndexedByValueSchemaId = new SparseConcurrentList<>(); + this.fastAvroEnabled = fastAvroEnabled; this.deserializerCache = new BiIntKeyCache<>((writerSchemaId, readerSchemaId) -> { Schema rmdWriterSchema = getRmdSchema(writerSchemaId); Schema rmdReaderSchema = getRmdSchema(readerSchemaId); - return MapOrderingPreservingSerDeFactory.getDeserializer(rmdWriterSchema, rmdReaderSchema); + return this.fastAvroEnabled + ? MapOrderPreservingFastSerDeFactory.getDeserializer(rmdWriterSchema, rmdReaderSchema) + : MapOrderPreservingSerDeFactory.getDeserializer(rmdWriterSchema, rmdReaderSchema); }); } @@ -98,6 +110,8 @@ private RecordDeserializer getRmdDeserializer(final int writerSch private RecordSerializer generateRmdSerializer(int valueSchemaId) { Schema replicationMetadataSchema = getRmdSchema(valueSchemaId); - return MapOrderingPreservingSerDeFactory.getSerializer(replicationMetadataSchema); + return fastAvroEnabled + ? MapOrderPreservingFastSerDeFactory.getSerializer(replicationMetadataSchema) + : MapOrderPreservingSerDeFactory.getSerializer(replicationMetadataSchema); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java index 9540677db7..109a3283f8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/SortBasedCollectionFieldOpHandler.java @@ -59,6 +59,12 @@ public UpdateResultStatus handlePutList( collectionFieldRmd.setPutOnlyPartLength(toPutList.size()); return UpdateResultStatus.COMPLETELY_UPDATED; } + /** + * LinkedList is more efficient for the following add/remove operations. + */ + if (!toPutList.isEmpty() && !(toPutList instanceof LinkedList)) { + toPutList = new LinkedList<>((toPutList)); + } // The current list is NOT in the put-only state. So we need to de-dup the incoming list. deDupListFromEnd(toPutList); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/Utils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/Utils.java index 365b6ed167..d8bffa5149 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/Utils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/schema/merge/Utils.java @@ -1,6 +1,8 @@ package com.linkedin.davinci.schema.merge; import com.linkedin.davinci.utils.IndexedHashMap; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; @@ -18,6 +20,12 @@ static IndexedHashMap createElementToActiveTsMap( final int putOnlyPartLength) { IndexedHashMap activeElementToTsMap = new IndexedHashMap<>(existingElements.size()); int idx = 0; + if (!existingElements.isEmpty() && activeTimestamps instanceof LinkedList) { + /** + * LinkedList is not efficient for get operation + */ + activeTimestamps = new ArrayList<>(activeTimestamps); + } for (T existingElement: existingElements) { final long activeTimestamp; if (idx < putOnlyPartLength) { @@ -41,6 +49,12 @@ static IndexedHashMap createDeletedElementToTsMap( ) { IndexedHashMap elementToTimestampMap = new IndexedHashMap<>(); int idx = 0; + if (!deletedTimestamps.isEmpty() && deletedElements instanceof LinkedList) { + /** + * LinkedList is not efficient for get operation + */ + deletedElements = new ArrayList<>(deletedElements); + } for (long deletedTimestamp: deletedTimestamps) { if (deletedTimestamp >= minTimestamp) { elementToTimestampMap.put(deletedElements.get(idx), deletedTimestamp); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDatumReader.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDatumReader.java index 6dbfa430d0..14a9607c29 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDatumReader.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDatumReader.java @@ -2,8 +2,6 @@ import com.linkedin.davinci.utils.IndexedHashMap; import com.linkedin.davinci.utils.IndexedMap; -import java.util.Collection; -import java.util.LinkedList; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -18,15 +16,6 @@ public MapOrderPreservingDatumReader(Schema writer, Schema reader) { super(writer, reader); } - @Override - protected Object newArray(Object old, int size, Schema schema) { - if (old instanceof Collection) { - ((Collection) old).clear(); - return old; - } else - return new LinkedList<>(); - } - @Override protected Object newMap(Object old, int size) { if (old instanceof IndexedMap) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDeserializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDeserializer.java index a1dcb07a22..34ba0e53ec 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDeserializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingDeserializer.java @@ -10,7 +10,7 @@ */ public class MapOrderPreservingDeserializer extends AvroGenericDeserializer { /** - * Constructor is made package-private so that users of this class should create it via {@link MapOrderingPreservingSerDeFactory} + * Constructor is made package-private so that users of this class should create it via {@link MapOrderPreservingSerDeFactory} */ MapOrderPreservingDeserializer(Schema writer, Schema reader) { super(new MapOrderPreservingDatumReader<>(writer, reader)); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderingPreservingSerDeFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerDeFactory.java similarity index 93% rename from clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderingPreservingSerDeFactory.java rename to clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerDeFactory.java index 6d1b392116..743082b4eb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderingPreservingSerDeFactory.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerDeFactory.java @@ -10,7 +10,7 @@ * This class is a factory that creates {@link MapOrderPreservingSerializer} and {@link MapOrderPreservingDeserializer} * with given schemas and cache them. */ -public class MapOrderingPreservingSerDeFactory extends SerializerDeserializerFactory { +public class MapOrderPreservingSerDeFactory extends SerializerDeserializerFactory { private static final Map> SERIALIZER_MAP = new VeniceConcurrentHashMap<>(); private static final Map DESERIALIZER_MAP = new VeniceConcurrentHashMap<>(); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerializer.java index 4ff80e3077..b9f8278d0d 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerializer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerializer.java @@ -10,7 +10,7 @@ */ public class MapOrderPreservingSerializer extends AvroSerializer { /** - * Constructor is made package-private so that users of this class should create it via {@link MapOrderingPreservingSerDeFactory} + * Constructor is made package-private so that users of this class should create it via {@link MapOrderPreservingSerDeFactory} * @param schema */ MapOrderPreservingSerializer(Schema schema) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastDeserializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastDeserializer.java new file mode 100644 index 0000000000..29bcefadf1 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastDeserializer.java @@ -0,0 +1,31 @@ +package com.linkedin.davinci.serializer.avro.fast; + +import com.linkedin.avro.fastserde.FastGenericDatumReader; +import com.linkedin.avro.fastserde.FastSerdeCache; +import com.linkedin.avro.fastserde.customized.DatumReaderCustomization; +import com.linkedin.davinci.utils.IndexedHashMap; +import com.linkedin.davinci.utils.IndexedMap; +import com.linkedin.venice.serializer.AvroGenericDeserializer; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + + +public class MapOrderPreservingFastDeserializer extends AvroGenericDeserializer { + public MapOrderPreservingFastDeserializer(Schema writer, Schema reader) { + super( + new FastGenericDatumReader<>( + writer, + reader, + FastSerdeCache.getDefaultInstance(), + null, + new DatumReaderCustomization.Builder().setNewMapOverrideFunc((old, size) -> { + if (old instanceof IndexedMap) { + ((Map) old).clear(); + return old; + } else { + return new IndexedHashMap<>(size); + } + }).build())); + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastSerDeFactory.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastSerDeFactory.java new file mode 100644 index 0000000000..087b16952f --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastSerDeFactory.java @@ -0,0 +1,27 @@ +package com.linkedin.davinci.serializer.avro.fast; + +import com.linkedin.venice.serializer.SerializerDeserializerFactory; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import java.util.Map; +import org.apache.avro.Schema; + + +/** + * Factory to create fast serializer/deserializer of supporting map ordering. + */ +public class MapOrderPreservingFastSerDeFactory { + private static final Map SERIALIZER_MAP = new VeniceConcurrentHashMap<>(); + private static final Map DESERIALIZER_MAP = + new VeniceConcurrentHashMap<>(); + + public static MapOrderPreservingFastSerializer getSerializer(Schema schema) { + return (MapOrderPreservingFastSerializer) SERIALIZER_MAP + .computeIfAbsent(schema, s -> new MapOrderPreservingFastSerializer<>(s)); + } + + public static MapOrderPreservingFastDeserializer getDeserializer(Schema writerSchema, Schema readerSchema) { + return DESERIALIZER_MAP.computeIfAbsent( + new SerializerDeserializerFactory.SchemaPairAndClassContainer(writerSchema, readerSchema, Object.class), + o -> new MapOrderPreservingFastDeserializer(writerSchema, readerSchema)); + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastSerializer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastSerializer.java new file mode 100644 index 0000000000..d2addbd202 --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/serializer/avro/fast/MapOrderPreservingFastSerializer.java @@ -0,0 +1,43 @@ +package com.linkedin.davinci.serializer.avro.fast; + +import com.linkedin.avro.fastserde.FastGenericDatumWriter; +import com.linkedin.avro.fastserde.FastSerdeCache; +import com.linkedin.avro.fastserde.FastSpecificDatumWriter; +import com.linkedin.avro.fastserde.customized.DatumWriterCustomization; +import com.linkedin.davinci.utils.IndexedHashMap; +import com.linkedin.venice.serializer.AvroSerializer; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.SortedMap; +import org.apache.avro.Schema; + + +public class MapOrderPreservingFastSerializer extends AvroSerializer { + private static final DatumWriterCustomization MAP_ORDER_PRESERVING_CHECK_CUSTOMIZATION = + new DatumWriterCustomization.Builder().setCheckMapTypeFunction(datum -> { + Map map = (Map) datum; + if (map.isEmpty()) { + return; + } + if (!(map instanceof LinkedHashMap || map instanceof IndexedHashMap || map instanceof SortedMap)) { + throw new IllegalStateException( + "Expect map to be either a LinkedHashMap or a IndexedHashMap or a SortedMap because" + + " the notion of ordering is required. Otherwise, it does not make sense to preserve \"order\". " + + "Got datum type: " + map.getClass()); + } + }).build(); + + public MapOrderPreservingFastSerializer(Schema schema) { + super( + new FastGenericDatumWriter<>( + schema, + null, + FastSerdeCache.getDefaultInstance(), + MAP_ORDER_PRESERVING_CHECK_CUSTOMIZATION), + new FastSpecificDatumWriter<>( + schema, + null, + FastSerdeCache.getDefaultInstance(), + MAP_ORDER_PRESERVING_CHECK_CUSTOMIZATION)); + } +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeBase.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeBase.java index d86b287dce..821d2ec904 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeBase.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeBase.java @@ -4,7 +4,7 @@ import static org.mockito.Mockito.when; import com.linkedin.davinci.replication.merge.helper.utils.ValueAndDerivedSchemas; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.davinci.utils.IndexedHashMap; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.schema.rmd.RmdConstants; @@ -123,10 +123,10 @@ protected GenericRecord deserializeValueRecord(ByteBuffer valueByteBuffer) { } protected RecordSerializer getSerializer(Schema writerSchema) { - return MapOrderingPreservingSerDeFactory.getAvroGenericSerializer(writerSchema); + return MapOrderPreservingSerDeFactory.getAvroGenericSerializer(writerSchema); } protected RecordDeserializer getDeserializer(Schema writerSchema, Schema readerSchema) { - return MapOrderingPreservingSerDeFactory.getAvroGenericDeserializer(writerSchema, readerSchema); + return MapOrderPreservingSerDeFactory.getAvroGenericDeserializer(writerSchema, readerSchema); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeConflictResolver.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeConflictResolver.java index f19849ca40..cfdfd9308e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeConflictResolver.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeConflictResolver.java @@ -9,7 +9,7 @@ import static org.mockito.Mockito.mock; import com.linkedin.davinci.replication.merge.helper.utils.ValueAndDerivedSchemas; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.rmd.RmdConstants; @@ -129,11 +129,11 @@ protected GenericRecord createRmdWithFieldLevelTimestamp( } protected RecordSerializer getSerializer(Schema writerSchema) { - return MapOrderingPreservingSerDeFactory.getSerializer(writerSchema); + return MapOrderPreservingSerDeFactory.getSerializer(writerSchema); } protected RecordDeserializer getDeserializer(Schema writerSchema, Schema readerSchema) { - return MapOrderingPreservingSerDeFactory.getDeserializer(writerSchema, readerSchema); + return MapOrderPreservingSerDeFactory.getDeserializer(writerSchema, readerSchema); } protected Utf8 toUtf8(String str) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java index c60e47d1b0..7cd8673f56 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithFieldLevelTimestamp.java @@ -10,7 +10,7 @@ import static org.mockito.Mockito.mock; import com.linkedin.davinci.replication.RmdWithValueSchemaId; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.rmd.RmdConstants; @@ -50,7 +50,7 @@ public void testUpdateIgnoredFieldUpdate() { updateFieldWriteComputeRecord.put("age", 66); updateFieldWriteComputeRecord.put("name", "Venice"); ByteBuffer writeComputeBytes = ByteBuffer.wrap( - MapOrderingPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); + MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); final long valueLevelTimestamp = 10L; Map fieldNameToTimestampMap = new HashMap<>(); fieldNameToTimestampMap.put("age", 10L); @@ -104,7 +104,7 @@ public void testWholeFieldUpdate() { oldValueRecord.put("name", "Kafka"); oldValueRecord.put("intArray", Arrays.asList(1, 2, 3)); ByteBuffer oldValueBytes = - ByteBuffer.wrap(MapOrderingPreservingSerDeFactory.getSerializer(personSchemaV2).serialize(oldValueRecord)); + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(personSchemaV2).serialize(oldValueRecord)); // Set up Write Compute request. Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV2); @@ -140,7 +140,7 @@ public void testWholeFieldUpdate() { updateFieldPartialUpdateRecord1.put("name", "Venice"); updateFieldPartialUpdateRecord1.put("intArray", Arrays.asList(6, 7, 8)); ByteBuffer writeComputeBytes1 = ByteBuffer.wrap( - MapOrderingPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldPartialUpdateRecord1)); + MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldPartialUpdateRecord1)); MergeConflictResult mergeConflictResult = mergeConflictResolver.update( Lazy.of(() -> oldValueBytes), rmdWithValueSchemaId, @@ -155,7 +155,7 @@ public void testWholeFieldUpdate() { GenericRecord updateFieldPartialUpdateRecord2 = AvroSchemaUtils.createGenericRecord(writeComputeSchema); updateFieldPartialUpdateRecord2.put("intArray", Arrays.asList(10, 20, 30, 40)); ByteBuffer writeComputeBytes2 = ByteBuffer.wrap( - MapOrderingPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldPartialUpdateRecord2)); + MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldPartialUpdateRecord2)); ByteBuffer updatedValueBytes = mergeConflictResult.getNewValue(); mergeConflictResult = mergeConflictResolver.update( @@ -215,7 +215,7 @@ public void testCollectionMerge() { stringMap.put("2", "two"); oldValueRecord.put("stringMap", stringMap); ByteBuffer oldValueBytes = - ByteBuffer.wrap(MapOrderingPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); // Set up Write Compute request. Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV1); @@ -226,8 +226,8 @@ public void testCollectionMerge() { updateBuilder.setElementsToAddToListField("intArray", Arrays.asList(6, 7, 8)); GenericRecord updateFieldRecord = updateBuilder.build(); - ByteBuffer writeComputeBytes = ByteBuffer - .wrap(MapOrderingPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldRecord)); + ByteBuffer writeComputeBytes = + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldRecord)); // Set up current replication metadata. final long valueLevelTimestamp = 10L; @@ -316,7 +316,7 @@ public void testCollectionMerge() { // Validate updated value. Assert.assertNotNull(mergeConflictResult.getNewValue()); ByteBuffer updatedValueBytes = mergeConflictResult.getNewValue(); - GenericRecord updatedValueRecord = MapOrderingPreservingSerDeFactory.getDeserializer(personSchemaV1, personSchemaV1) + GenericRecord updatedValueRecord = MapOrderPreservingSerDeFactory.getDeserializer(personSchemaV1, personSchemaV1) .deserialize(updatedValueBytes.array()); Assert.assertEquals(updatedValueRecord.get("age"), 99); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithValueLevelTimestamp.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithValueLevelTimestamp.java index c018d37f51..68192e37eb 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithValueLevelTimestamp.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/replication/merge/TestMergeUpdateWithValueLevelTimestamp.java @@ -11,7 +11,7 @@ import static org.mockito.Mockito.mock; import com.linkedin.davinci.replication.RmdWithValueSchemaId; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.rmd.RmdConstants; @@ -102,7 +102,7 @@ public void testUpdateIgnoredFieldUpdateWithEvolvedSchema() { updateFieldWriteComputeRecord.put("age", 66); updateFieldWriteComputeRecord.put("name", "Venice"); ByteBuffer writeComputeBytes = ByteBuffer.wrap( - MapOrderingPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); + MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); final long valueLevelTimestamp = 10L; GenericRecord rmdRecord = createRmdWithValueLevelTimestamp(personRmdSchemaV1, valueLevelTimestamp); RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(oldValueSchemaId, RMD_VERSION_ID, rmdRecord); @@ -155,7 +155,7 @@ public void testWholeFieldUpdate() { stringMap.put("2", "two"); oldValueRecord.put("stringMap", stringMap); ByteBuffer oldValueBytes = - ByteBuffer.wrap(MapOrderingPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); // Set up partial update request. Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV1); @@ -238,7 +238,7 @@ public void testWholeFieldUpdate() { // Validate updated value. Assert.assertNotNull(mergeConflictResult.getNewValue()); ByteBuffer updatedValueBytes = mergeConflictResult.getNewValue(); - GenericRecord updatedValueRecord = MapOrderingPreservingSerDeFactory.getDeserializer(personSchemaV1, personSchemaV1) + GenericRecord updatedValueRecord = MapOrderPreservingSerDeFactory.getDeserializer(personSchemaV1, personSchemaV1) .deserialize(updatedValueBytes.array()); Assert.assertEquals(updatedValueRecord.get("age"), 66); @@ -266,7 +266,7 @@ public void testCollectionMerge() { stringMap.put("2", "two"); oldValueRecord.put("stringMap", stringMap); ByteBuffer oldValueBytes = - ByteBuffer.wrap(MapOrderingPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); // Set up partial update request. Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV1); @@ -360,9 +360,8 @@ public void testCollectionMerge() { ByteBuffer updatedValueBytes = mergeConflictResult.getNewValue(); // Use annotated value schema to deserialize record and the map field will be keyed in Java String type. Schema annotatedSchema = annotateValueSchema(personSchemaV1); - GenericRecord updatedValueRecord = - MapOrderingPreservingSerDeFactory.getDeserializer(annotatedSchema, annotatedSchema) - .deserialize(updatedValueBytes.array()); + GenericRecord updatedValueRecord = MapOrderPreservingSerDeFactory.getDeserializer(annotatedSchema, annotatedSchema) + .deserialize(updatedValueBytes.array()); Assert.assertEquals(updatedValueRecord.get("age"), 99); Assert.assertEquals(updatedValueRecord.get("name").toString(), "Francisco"); @@ -395,7 +394,7 @@ public void testCollectionMerge() { 1, newColoID); ByteBuffer updatedValueBytes2 = mergeConflictResult2.getNewValue(); - updatedValueRecord = MapOrderingPreservingSerDeFactory.getDeserializer(annotatedSchema, annotatedSchema) + updatedValueRecord = MapOrderPreservingSerDeFactory.getDeserializer(annotatedSchema, annotatedSchema) .deserialize(updatedValueBytes2.array()); updatedMapField = (Map) updatedValueRecord.get("stringMap"); Assert.assertEquals(updatedMapField.size(), 2); @@ -418,7 +417,7 @@ public void testWholeFieldUpdateWithEvolvedSchema() { oldValueRecord.put("name", "Kafka"); oldValueRecord.put("intArray", Arrays.asList(1, 2, 3)); ByteBuffer oldValueBytes = - ByteBuffer.wrap(MapOrderingPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); // Set up Write Compute request. Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV2); @@ -430,7 +429,7 @@ public void testWholeFieldUpdateWithEvolvedSchema() { updateFieldWriteComputeRecord.put("favoritePet", "a random stray cat"); updateFieldWriteComputeRecord.put("stringArray", Arrays.asList("one", "two", "three")); ByteBuffer writeComputeBytes = ByteBuffer.wrap( - MapOrderingPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); + MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord)); // Set up current replication metadata. final long valueLevelTimestamp = 10L; @@ -536,7 +535,7 @@ public void testWholeFieldUpdateWithEvolvedSchema() { // Validate updated value. Assert.assertNotNull(mergeConflictResult.getNewValue()); ByteBuffer updatedValueBytes = mergeConflictResult.getNewValue(); - GenericRecord updatedValueRecord = MapOrderingPreservingSerDeFactory.getDeserializer(personSchemaV3, personSchemaV3) + GenericRecord updatedValueRecord = MapOrderPreservingSerDeFactory.getDeserializer(personSchemaV3, personSchemaV3) .deserialize(updatedValueBytes.array()); Assert.assertEquals(updatedValueRecord.get("age"), 66); @@ -565,7 +564,7 @@ public void testCollectionMergeWithEvolvedSchema() { oldValueRecord.put("name", "Kafka"); oldValueRecord.put("intArray", Arrays.asList(1, 2, 3)); ByteBuffer oldValueBytes = - ByteBuffer.wrap(MapOrderingPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(personSchemaV1).serialize(oldValueRecord)); // Set up Write Compute request. Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV2); @@ -576,8 +575,8 @@ public void testCollectionMergeWithEvolvedSchema() { updateBuilder.setElementsToRemoveFromListField("stringArray", Arrays.asList("four", "five", "six")); GenericRecord updateFieldRecord = updateBuilder.build(); - ByteBuffer writeComputeBytes = ByteBuffer - .wrap(MapOrderingPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldRecord)); + ByteBuffer writeComputeBytes = + ByteBuffer.wrap(MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldRecord)); // Set up current replication metadata. final long valueLevelTimestamp = 10L; @@ -694,7 +693,7 @@ public void testCollectionMergeWithEvolvedSchema() { // Validate updated value. Assert.assertNotNull(mergeConflictResult.getNewValue()); ByteBuffer updatedValueBytes = mergeConflictResult.getNewValue(); - GenericRecord updatedValueRecord = MapOrderingPreservingSerDeFactory.getDeserializer(personSchemaV3, personSchemaV3) + GenericRecord updatedValueRecord = MapOrderPreservingSerDeFactory.getDeserializer(personSchemaV3, personSchemaV3) .deserialize(updatedValueBytes.array()); Assert.assertEquals(updatedValueRecord.get("age"), 30); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/schema/TestSchemaUtils.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/schema/TestSchemaUtils.java index b095bd91f6..d3476687e0 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/schema/TestSchemaUtils.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/schema/TestSchemaUtils.java @@ -14,7 +14,7 @@ import static com.linkedin.venice.schema.writecompute.WriteComputeConstants.SET_DIFF; import static com.linkedin.venice.schema.writecompute.WriteComputeConstants.SET_UNION; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.venice.schema.AvroSchemaParseUtils; import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; @@ -153,11 +153,11 @@ public void testAnnotateRmdSchema() { } protected RecordSerializer getSerializer(Schema writerSchema) { - return MapOrderingPreservingSerDeFactory.getAvroGenericSerializer(writerSchema); + return MapOrderPreservingSerDeFactory.getAvroGenericSerializer(writerSchema); } protected RecordDeserializer getDeserializer(Schema writerSchema, Schema readerSchema) { - return MapOrderingPreservingSerDeFactory.getAvroGenericDeserializer(writerSchema, readerSchema); + return MapOrderPreservingSerDeFactory.getAvroGenericDeserializer(writerSchema, readerSchema); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerDeTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerDeTest.java index 1649a45362..88c08de84e 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerDeTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/serializer/avro/MapOrderPreservingSerDeTest.java @@ -32,10 +32,9 @@ public class MapOrderPreservingSerDeTest { @Test public void testCollectionFieldsDeserializedInConsistentOrder() { Schema valueSchema = AvroCompatibilityHelper.parse(VALUE_SCHEMA_STR); - MapOrderPreservingSerializer serializer = - MapOrderingPreservingSerDeFactory.getSerializer(valueSchema); + MapOrderPreservingSerializer serializer = MapOrderPreservingSerDeFactory.getSerializer(valueSchema); MapOrderPreservingDeserializer deserializer = - MapOrderingPreservingSerDeFactory.getDeserializer(valueSchema, valueSchema); + MapOrderPreservingSerDeFactory.getDeserializer(valueSchema, valueSchema); List> mapEntries = Arrays.asList( new Pair<>("cat_1", 1), @@ -55,8 +54,7 @@ public void testCollectionFieldsDeserializedInConsistentOrder() { @Test public void testDeserializeEmptyCollectionFields() { Schema valueSchema = AvroCompatibilityHelper.parse(VALUE_SCHEMA_STR); - MapOrderPreservingSerializer serializer = - MapOrderingPreservingSerDeFactory.getSerializer(valueSchema); + MapOrderPreservingSerializer serializer = MapOrderPreservingSerDeFactory.getSerializer(valueSchema); MapOrderPreservingDeserializer deserializer = new MapOrderPreservingDeserializer(valueSchema, valueSchema); validateConsistentSerdeResults( diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java index da51bd546c..d1f6a7ae81 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.java @@ -5,7 +5,7 @@ import com.linkedin.davinci.schema.merge.CollectionTimestampMergeRecordHelper; import com.linkedin.davinci.schema.merge.MergeRecordHelper; import com.linkedin.davinci.schema.merge.UpdateResultStatus; -import com.linkedin.davinci.serializer.avro.MapOrderingPreservingSerDeFactory; +import com.linkedin.davinci.serializer.avro.MapOrderPreservingSerDeFactory; import com.linkedin.venice.hadoop.AbstractVeniceFilter; import com.linkedin.venice.hadoop.VenicePushJob; import com.linkedin.venice.hadoop.schema.HDFSSchemaSource; @@ -36,6 +36,10 @@ public abstract class VeniceRmdTTLFilter extends AbstractVeniceFilt private final HDFSSchemaSource schemaSource; protected final Map rmdSchemaMap; protected final Map valueSchemaMap; + /** + * TODO: we will adopt fast-avro in a next iteration after fast-avro adoption in the AAWC code path + * is fully verified. + */ private final Map> rmdDeserializerCache; private final Map> valueDeserializerCache; private final Map> rmdSerializerCache; @@ -117,22 +121,22 @@ boolean filterByTTLandMaybeUpdateValue(final INPUT_VALUE value) { RecordDeserializer generateRmdDeserializer(RmdVersionId rmdVersionId) { Schema schema = rmdSchemaMap.get(rmdVersionId); - return MapOrderingPreservingSerDeFactory.getDeserializer(schema, schema); + return MapOrderPreservingSerDeFactory.getDeserializer(schema, schema); } RecordDeserializer generateValueDeserializer(int valueSchemaId) { Schema schema = valueSchemaMap.get(valueSchemaId); - return MapOrderingPreservingSerDeFactory.getDeserializer(schema, schema); + return MapOrderPreservingSerDeFactory.getDeserializer(schema, schema); } RecordSerializer generateRmdSerializer(RmdVersionId rmdVersionId) { Schema schema = rmdSchemaMap.get(rmdVersionId); - return MapOrderingPreservingSerDeFactory.getSerializer(schema); + return MapOrderPreservingSerDeFactory.getSerializer(schema); } RecordSerializer generateValueSerializer(int valueSchemaId) { Schema schema = valueSchemaMap.get(valueSchemaId); - return MapOrderingPreservingSerDeFactory.getSerializer(schema); + return MapOrderPreservingSerDeFactory.getSerializer(schema); } protected abstract int getSchemaId(final INPUT_VALUE value); diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactory.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactory.java index 495bed4905..4c6fa5f0b6 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactory.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactory.java @@ -93,7 +93,7 @@ public static void cacheFastAvroGenericDeserializer(Schema writerSchema, Schema private static void tryCacheFastGenericDeserializer(Schema writerSchema, Schema readerSchema) { FastDeserializer fastDeserializer = cache.getFastGenericDeserializer(writerSchema, readerSchema); - if (fastDeserializer instanceof FastSerdeCache.FastDeserializerWithAvroGenericImpl) { + if (!fastDeserializer.hasDynamicClassGenerationDone()) { throw new VeniceException("Failed to generate fast generic de-serializer for Avro schema " + writerSchema); } } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/SerializerDeserializerFactory.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/SerializerDeserializerFactory.java index 309ba56f02..241212a9c1 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/SerializerDeserializerFactory.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/SerializerDeserializerFactory.java @@ -17,7 +17,7 @@ */ public class SerializerDeserializerFactory { // Class works as the key of caching map for SerializerDeserializerFactory - protected static class SchemaPairAndClassContainer { + public static class SchemaPairAndClassContainer { public Schema writer; public Schema reader; public Class c; diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactoryTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactoryTest.java index 3316ca57d8..a77b73898e 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactoryTest.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/serializer/FastSerializerDeserializerFactoryTest.java @@ -1,5 +1,6 @@ package com.linkedin.venice.serializer; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; import com.linkedin.venice.read.protocol.response.streaming.StreamingFooterRecordV1; import com.linkedin.venice.utils.TestUtils; import java.util.concurrent.ExecutorService; @@ -37,4 +38,10 @@ public void concurrentFastAvroVerification() throws InterruptedException { TestUtils.shutdownExecutor(executor); } } + + @Test + public void checkFastDeserializerGeneration() { + FastSerializerDeserializerFactory + .cacheFastAvroGenericDeserializer(KafkaMessageEnvelope.SCHEMA$, KafkaMessageEnvelope.SCHEMA$, 10000); + } }