Skip to content

Commit

Permalink
[LI] Make MergeHiveSchemaWithAvro not reordering avro optional union … (
Browse files Browse the repository at this point in the history
#153)

* [LI] Make MergeHiveSchemaWithAvro not reordering avro optional union field when it's it's merging a hive primitive with an avro optional union
  • Loading branch information
rzhang10 authored Mar 15, 2024
1 parent 16e05f7 commit d581a69
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ public static boolean nullExistInUnion(Schema schema) {
return false;
}

public static int getNullIndexInUnion(Schema schema) {
Preconditions.checkArgument(schema.getType() == UNION,
"Expected union schema but was passed: %s", schema);
for (int i = 0; i < schema.getTypes().size(); i++) {
if (schema.getTypes().get(i).getType() == Schema.Type.NULL) {
return i;
}
}
// which means null is not present in the union
return -1;
}

public static Schema toOption(Schema schema) {
return toOption(schema, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ private Schema reorderOptionIfRequired(Schema schema, Object defaultValue) {
boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL;
if (isNullFirstOption && defaultValue.equals(JsonProperties.NULL_VALUE)) {
return schema;
} else if (!isNullFirstOption && !defaultValue.equals(JsonProperties.NULL_VALUE)) {
return schema;
} else {
return Schema.createUnion(schema.getTypes().get(1), schema.getTypes().get(0));
}
Expand Down Expand Up @@ -143,10 +145,14 @@ public Schema union(UnionTypeInfo union, Schema partner, List<Schema> results) {
@Override
public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) {
boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner);
boolean nullShouldBeSecondElementInOptionalUnionSchema = partner != null &&
shouldResultBeOptional && AvroSchemaUtil.getNullIndexInUnion(partner) == 1;
Schema hivePrimitive = hivePrimitiveToAvro(primitive);
// if there was no matching Avro primitive, use the Hive primitive
Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner);
return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result;
Schema result =
partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner);
return shouldResultBeOptional ? AvroSchemaUtil.toOption(result,
nullShouldBeSecondElementInOptionalUnionSchema) : result;
}

private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import java.util.stream.Collectors;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.internal.JacksonUtils;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
Expand Down Expand Up @@ -277,6 +281,32 @@ public void shouldReorderOptionalSchemaToMatchDefaultValue() {
assertSchema(avro, merge(hive, avro));
}

// this test record level default value is valid with regard to inner field's optional union schema order
@Test
public void shouldReorderOptionalSchemaToMatchDefaultValue2() {
String hive = "struct<inner:struct<f1:string,f2:string>>";

Schema inner = SchemaBuilder.record("INNER").fields()
.name("f1").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))
.noDefault()
.name("f2").type(Schema.createUnion(Schema.create(Type.STRING), Schema.create(Type.NULL)))
.noDefault()
.endRecord();

GenericData.Record recdef = new GenericRecordBuilder(inner).set("f1", null).set("f2", "foo")
.build();

Schema avro2 = SchemaBuilder.record("OUTER").fields()
.name("inner").type().record("INNER").fields()
.name("f1").type(Schema.createUnion(Schema.create(Type.NULL), Schema.create(Type.STRING)))
.noDefault()
.name("f2").type(Schema.createUnion(Schema.create(Type.STRING), Schema.create(Type.NULL)))
.noDefault()
.endRecord().recordDefault(recdef).endRecord();

assertSchema(avro2, merge(hive, avro2));
}

@Rule
public ExpectedException thrown = ExpectedException.none();

Expand Down

0 comments on commit d581a69

Please sign in to comment.