Skip to content

Commit

Permalink
[server] Best effort to extract optional collection type for type val…
Browse files Browse the repository at this point in the history
…idation before Active/Active partial update collection operation (#736)

This PR removes strict optional collection type check before applying collection partial update in Active/Active store.
Previously it requires [null, collection] to be the union collection schema. This PR uses the SchemaUtils.unwrapOptionalUnion() API to retrieve the collection type.
  • Loading branch information
sixpluszero authored Nov 3, 2023
1 parent f27bcc1 commit 7f5d148
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,17 @@ public static Schema.Type unwrapOptionalUnion(Schema fieldSchema) {
return fieldSchema.getType();
}
}

/**
* This method checks if the field contains expected type. It relies on {@link SchemaUtils#unwrapOptionalUnion(Schema)}
* to retrieve the type from the field if incoming field is union.
* If it could not find expected type, or the incoming union is not valid for processing, it will throw {@link IllegalStateException}
*/
public static void validateFieldSchemaType(String fieldName, Schema fieldSchema, Schema.Type expectedType) {
final Schema.Type fieldSchemaType = SchemaUtils.unwrapOptionalUnion(fieldSchema);
if (fieldSchemaType != expectedType) {
throw new IllegalStateException(
String.format("Expect field %s to be of type %s. But got: %s", fieldName, expectedType, fieldSchemaType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.avro.api.PrimitiveLongList;
import com.linkedin.avro.fastserde.primitive.PrimitiveLongArrayList;
import com.linkedin.davinci.schema.SchemaUtils;
import com.linkedin.davinci.utils.IndexedHashMap;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp;
Expand Down Expand Up @@ -39,7 +40,7 @@ public UpdateResultStatus handlePutList(
if (ignoreIncomingRequest(putTimestamp, coloID, collectionFieldRmd)) {
return UpdateResultStatus.NOT_UPDATED_AT_ALL;
}
validateFieldSchemaType(currValueRecordField, Schema.Type.ARRAY, true);
SchemaUtils.validateFieldSchemaType(currValueRecordField.name(), currValueRecordField.schema(), Schema.Type.ARRAY);

// Current list will be updated.
final long currTopLevelTimestamp = collectionFieldRmd.getTopLevelFieldTimestamp();
Expand Down Expand Up @@ -170,44 +171,6 @@ private void deDupListFromEnd(List<Object> list) {
deDupSet.clear(); // Try to be more GC friendly.
}

private void validateFieldSchemaType(
Schema.Field currValueRecordField,
Schema.Type expectType,
boolean nullableAllowed) {
final Schema fieldSchema = currValueRecordField.schema();
final Schema.Type fieldSchemaType = fieldSchema.getType();
if (nullableAllowed && fieldSchemaType == Schema.Type.UNION) {
validateFieldSchemaIsNullableType(fieldSchema, expectType);
return;
}
if (fieldSchemaType != expectType) {
throw new IllegalStateException(
String.format(
"Expect field %s to be of type %s. But got: %s",
currValueRecordField.name(),
expectType,
fieldSchemaType));
}
}

private void validateFieldSchemaIsNullableType(Schema fieldSchema, Schema.Type expectType) {
// // Expect a nullable type. Expect a union of [null, expected type]
if (fieldSchema.getType() != Schema.Type.UNION) {
throw new IllegalStateException("Expect a union. Got field schema: " + fieldSchema);
}
if (fieldSchema.getTypes().size() != 2) {
throw new IllegalStateException("Expect a union of size 2. Got field schema: " + fieldSchema);
}
if (fieldSchema.getTypes().get(0).getType() != Schema.Type.NULL) {
throw new IllegalStateException(
"Expect the first element in the union to be null. Got field schema: " + fieldSchema);
}
if (fieldSchema.getTypes().get(1).getType() != expectType) {
throw new IllegalStateException(
"Expect the second element in the union to be the expected type. Got field schema: " + fieldSchema);
}
}

@Override
public UpdateResultStatus handlePutMap(
final long putTimestamp,
Expand All @@ -219,7 +182,7 @@ public UpdateResultStatus handlePutMap(
if (ignoreIncomingRequest(putTimestamp, coloID, collectionFieldRmd)) {
return UpdateResultStatus.NOT_UPDATED_AT_ALL;
}
validateFieldSchemaType(currValueRecordField, Schema.Type.MAP, true);
SchemaUtils.validateFieldSchemaType(currValueRecordField.name(), currValueRecordField.schema(), Schema.Type.MAP);
collectionFieldRmd.setTopLevelFieldTimestamp(putTimestamp);
collectionFieldRmd.setTopLevelColoID(coloID);
IndexedHashMap<String, Object> toPutMap;
Expand Down Expand Up @@ -318,7 +281,7 @@ public UpdateResultStatus handleDeleteList(
if (ignoreIncomingRequest(deleteTimestamp, coloID, collectionFieldRmd)) {
return UpdateResultStatus.NOT_UPDATED_AT_ALL;
}
validateFieldSchemaType(currValueRecordField, Schema.Type.ARRAY, true);
SchemaUtils.validateFieldSchemaType(currValueRecordField.name(), currValueRecordField.schema(), Schema.Type.ARRAY);
// Current list will be deleted (partially or completely).
final int currPutOnlyPartLength = collectionFieldRmd.getPutOnlyPartLength();
collectionFieldRmd.setTopLevelFieldTimestamp(deleteTimestamp);
Expand Down Expand Up @@ -367,7 +330,7 @@ public UpdateResultStatus handleDeleteMap(
if (ignoreIncomingRequest(deleteTimestamp, coloID, collectionFieldRmd)) {
return UpdateResultStatus.NOT_UPDATED_AT_ALL;
}
validateFieldSchemaType(currValueRecordField, Schema.Type.MAP, true);
SchemaUtils.validateFieldSchemaType(currValueRecordField.name(), currValueRecordField.schema(), Schema.Type.MAP);
// Handle Delete on a map that is in the collection-merge mode.
final int originalPutOnlyPartLength = collectionFieldRmd.getPutOnlyPartLength();
final long originalTopLevelFieldTimestamp = collectionFieldRmd.getTopLevelFieldTimestamp();
Expand Down Expand Up @@ -419,7 +382,7 @@ public UpdateResultStatus handleModifyList(
if (ignoreIncomingRequest(modifyTimestamp, Integer.MIN_VALUE, collectionFieldRmd)) {
return UpdateResultStatus.NOT_UPDATED_AT_ALL;
}
validateFieldSchemaType(currValueRecordField, Schema.Type.ARRAY, true);
SchemaUtils.validateFieldSchemaType(currValueRecordField.name(), currValueRecordField.schema(), Schema.Type.ARRAY);
Set<Object> toAddElementSet = new HashSet<>(toAddElements);
Set<Object> toRemoveElementSet = new HashSet<>(toRemoveElements);
removeIntersectionElements(toAddElementSet, toRemoveElementSet);
Expand Down Expand Up @@ -751,7 +714,7 @@ public UpdateResultStatus handleModifyMap(
if (ignoreIncomingRequest(modifyTimestamp, Integer.MIN_VALUE, collectionFieldRmd)) {
return UpdateResultStatus.NOT_UPDATED_AT_ALL;
}
validateFieldSchemaType(currValueRecordField, Schema.Type.MAP, true);
SchemaUtils.validateFieldSchemaType(currValueRecordField.name(), currValueRecordField.schema(), Schema.Type.MAP);
if (toRemoveKeys.isEmpty() && newEntries.isEmpty()) {
return UpdateResultStatus.NOT_UPDATED_AT_ALL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ public void testUnwrapOptionalUnion(Schema type) {
assertTrue(SchemaUtils.unwrapOptionalUnion(nullAndTypeAndOtherTypeUnion) == UNION);
}

@Test(dataProviderClass = DataProviderUtils.class, dataProvider = "All-Avro-Schemas-Except-Null-And-Union")
public void testValidateFieldSchemaType(Schema type) {
Schema nullAndTypeUnion = Schema.createUnion(Schema.create(NULL), type);
SchemaUtils.validateFieldSchemaType("field", nullAndTypeUnion, type.getType());

Schema typeAndNullUnion = Schema.createUnion(type, Schema.create(NULL));
SchemaUtils.validateFieldSchemaType("field", typeAndNullUnion, type.getType());

Schema singleTypeOnlyUnion = Schema.createUnion(type);
SchemaUtils.validateFieldSchemaType("field", singleTypeOnlyUnion, type.getType());

Schema other = Schema.create(type.getType() == INT ? LONG : INT);
Schema typeAndOtherTypeUnion = Schema.createUnion(type, other);
Assert.assertThrows(() -> SchemaUtils.validateFieldSchemaType("field", typeAndOtherTypeUnion, type.getType()));

Schema nullAndTypeAndOtherTypeUnion = Schema.createUnion(Schema.create(NULL), type, other);
Assert
.assertThrows(() -> SchemaUtils.validateFieldSchemaType("field", nullAndTypeAndOtherTypeUnion, type.getType()));
}

protected RecordSerializer<GenericRecord> getSerializer(Schema writerSchema) {
return MapOrderPreservingSerDeFactory.getAvroGenericSerializer(writerSchema);
}
Expand Down

0 comments on commit 7f5d148

Please sign in to comment.