Skip to content

Commit

Permalink
[controller] Merge field-level props when generating superset schema
Browse files Browse the repository at this point in the history
  • Loading branch information
gaojieliu committed Nov 30, 2023
1 parent 59bcfe4 commit bae4694
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,51 @@ private static Schema unionSchema(Schema s1, Schema s2) {
return Schema.createUnion(combinedSchema);
}

private static FieldBuilder deepCopySchemaField(Schema.Field field) {
FieldBuilder fieldBuilder = AvroCompatibilityHelper.newField(null)
.setName(field.name())
.setSchema(field.schema())
.setDoc(field.doc())
.setOrder(field.order());
field.getObjectProps().forEach((k, ignored) -> {
String propValue = field.getProp(k);
if (propValue != null) {
fieldBuilder.addProp(k, propValue);
}
});

// set default as AvroCompatibilityHelper builder might drop defaults if there is type mismatch
if (field.hasDefaultValue()) {
fieldBuilder.setDefault(getFieldDefault(field));
}

return fieldBuilder;
}

private static List<Schema.Field> mergeFieldSchemas(Schema s1, Schema s2) {
List<Schema.Field> fields = new ArrayList<>();

for (Schema.Field f1: s1.getFields()) {
Schema.Field f2 = s2.getField(f1.name());
FieldBuilder fieldBuilder = AvroCompatibilityHelper.newField(f1);

// set default as AvroCompatibilityHelper builder might drop defaults if there is type mismatch
if (f1.hasDefaultValue()) {
fieldBuilder.setDefault(getFieldDefault(f1));
}
FieldBuilder fieldBuilder = deepCopySchemaField(f1);
if (f2 != null) {
fieldBuilder.setSchema(generateSuperSetSchema(f1.schema(), f2.schema()))
.setDoc(f1.doc() != null ? f1.doc() : f2.doc());
// merge props from f2
f2.getObjectProps().forEach((k, ignored) -> {
String propValue = f2.getProp(k);
if (propValue != null) {
fieldBuilder.addProp(k, propValue);
}
});
}
fields.add(fieldBuilder.build());
}

for (Schema.Field f2: s2.getFields()) {
if (s1.getField(f2.name()) == null) {
FieldBuilder fieldBuilder = AvroCompatibilityHelper.newField(f2);
if (f2.hasDefaultValue()) {
// set default as AvroCompatibilityHelper builder might drop defaults if there is type mismatch
fieldBuilder.setDefault(getFieldDefault(f2));
}
fields.add(fieldBuilder.build());
fields.add(deepCopySchemaField(f2).build());
}
}
return fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,30 @@ public void testIsSupersetSchemaNestedRecordSchema() {
Assert.assertFalse(AvroSupersetSchemaUtils.isSupersetSchema(schema1, schema2));
}

@Test
public void testSupersetSchemaContainsMergeFieldProps() {
String valueSchemaStr1 = "{\n" + " \"name\": \"TestRecord\",\n" + " \"type\": \"record\",\n" + " \"fields\": [\n"
+ " {\"name\": \"int_field\", \"type\": \"int\", \"doc\": \"int field\", \"prop1\": \"\\\"prop1_v1\\\"\"}\n"
+ " ],\n" + " \"schema_prop\": \"\\\"schema_prop_v1\\\"\"\n" + "}";
String valueSchemaStr2 = "{\n" + " \"name\": \"TestRecord\",\n" + " \"type\": \"record\",\n" + " \"fields\": [\n"
+ " {\"name\": \"int_field\", \"type\": \"int\", \"doc\": \"int field\", \"prop1\": \"\\\"prop1_v2\\\"\", \"prop2\": \"\\\"prop2_v1\\\"\"},\n"
+ " {\"name\": \"string_field\", \"type\": \"string\", \"doc\": \"string field\", \"prop3\": \"\\\"prop3_v1\\\"\", \"prop2\": \"\\\"prop2_v2\\\"\"}\n"
+ " ],\n" + " \"schema_prop\": \"\\\"schema_prop_v2\\\"\"\n" + "}";

Schema schema1 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr1);
Schema schema2 = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(valueSchemaStr2);

Schema supersetSchema = AvroSupersetSchemaUtils.generateSuperSetSchema(schema1, schema2);

Schema.Field intField = supersetSchema.getField("int_field");
Schema.Field stringField = supersetSchema.getField("string_field");

Assert.assertEquals(intField.getProp("prop1"), "prop1_v2");
Assert.assertEquals(intField.getProp("prop2"), "prop2_v1");
Assert.assertEquals(stringField.getProp("prop3"), "prop3_v1");
Assert.assertEquals(stringField.getProp("prop2"), "prop2_v2");
}

@Test
public void testGetSupersetSchemaFromSchemaResponse() {
MultiSchemaResponse.Schema[] schemas = new MultiSchemaResponse.Schema[3];
Expand Down

0 comments on commit bae4694

Please sign in to comment.