Skip to content

Commit

Permalink
Use implicit row constructors
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Apr 2, 2024
1 parent ad8449f commit e045a52
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
.filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
.collect(Collectors.toList());
return Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields);
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields),
dataType.isNullable());
} else {
switch (dataType.getSqlTypeName()) {
case INTEGER:
Expand All @@ -42,6 +43,14 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable());
case CHAR:
return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
case BOOLEAN:
return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
case ARRAY:
return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
dataType.isNullable());
// TODO support map types
// case MAP:
// return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
case UNKNOWN:
case NULL:
return Schema.createUnion(Schema.create(Schema.Type.NULL));
Expand All @@ -56,14 +65,18 @@ public static Schema avro(String namespace, String name, RelProtoDataType relPro
return avro(namespace, name, relProtoDataType.apply(factory));
}

private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) {
private static Schema createAvroSchemaWithNullability(Schema schema, boolean nullable) {
if (nullable) {
return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(rawType));
return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
} else {
return Schema.create(rawType);
return schema;
}
}

private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) {
return createAvroSchemaWithNullability(Schema.create(rawType), nullable);
}

public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
Expand All @@ -74,17 +87,24 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
.filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
.collect(Collectors.toList()));
case INT:
// schema.isNullable() should be false for basic types iiuc
return createRelTypeWithNullability(typeFactory, SqlTypeName.INTEGER, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.INTEGER);
case LONG:
return createRelTypeWithNullability(typeFactory, SqlTypeName.BIGINT, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.BIGINT);
case ENUM:
case FIXED:
case STRING:
return createRelTypeWithNullability(typeFactory, SqlTypeName.VARCHAR, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.VARCHAR);
case FLOAT:
return createRelTypeWithNullability(typeFactory, SqlTypeName.FLOAT, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.FLOAT);
case DOUBLE:
return createRelTypeWithNullability(typeFactory, SqlTypeName.DOUBLE, schema.isNullable());
return createRelType(typeFactory, SqlTypeName.DOUBLE);
case BOOLEAN:
return createRelType(typeFactory, SqlTypeName.BOOLEAN);
case ARRAY:
return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
// TODO support map types
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
case UNION:
if (schema.isNullable() && schema.getTypes().size() == 2) {
Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
Expand All @@ -102,9 +122,9 @@ public static RelDataType rel(Schema schema) {
return rel(schema, DataType.DEFAULT_TYPE_FACTORY);
}

private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) {
private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) {
RelDataType rawType = typeFactory.createSqlType(typeName);
return typeFactory.createTypeWithNullability(rawType, nullable);
return typeFactory.createTypeWithNullability(rawType, false);
}

public static RelProtoDataType proto(Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.sql.SqlWriter;
//import org.apache.calcite.sql.SqlWriterConfig;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCollectionTypeNameSpec;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlRowTypeNameSpec;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.sql.fun.SqlRowOperator;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.SqlShuttle;

import java.util.Map;
import java.util.List;
Expand Down Expand Up @@ -129,9 +138,31 @@ public QueryImplementor(RelNode relNode) {
public void implement(SqlWriter w) {
RelToSqlConverter converter = new RelToSqlConverter(w.getDialect());
SqlImplementor.Result result = converter.visitRoot(relNode);
w.literal(result.asSelect().toSqlString(w.getDialect()).getSql());
SqlSelect select = result.asSelect();
if (select.getSelectList() != null) {
select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR));
}
w.literal(select.toSqlString(w.getDialect()).getSql());
}
}

// A `ROW(...)` operator which will unparse as just `(...)`.
private final SqlRowOperator IMPLIED_ROW_OPERATOR = new SqlRowOperator(""); // empty string name

// a shuttle that replaces `Row(...)` with just `(...)`
private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() {
@Override
public SqlNode visit(SqlCall call) {
List<SqlNode> operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList());
if ((call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST
|| call.getOperator() instanceof SqlRowOperator)
&& operands.size() > 1) {
return IMPLIED_ROW_OPERATOR.createCall(call.getParserPosition(), operands);
} else {
return call.getOperator().createCall(call.getParserPosition(), operands);
}
}
};
}

/**
* Implements a CREATE TABLE...WITH... DDL statement.
Expand Down Expand Up @@ -291,14 +322,18 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) {
.map(x -> toSpec(x))
.collect(Collectors.toList());
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO));
} if (dataType.getComponentType() != null) {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec(
dataType.getComponentType().getSqlTypeName(), SqlParserPos.ZERO), dataType.getSqlTypeName(), SqlParserPos.ZERO),
SqlParserPos.ZERO));
} else {
return maybeNullable(dataType, new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO));
}
}

private static SqlDataTypeSpec maybeNullable(RelDataType dataType, SqlDataTypeSpec spec) {
if (!dataType.isNullable()) {
return spec.withNullable(true);
return spec.withNullable(false);
} else {
// we don't want "VARCHAR NULL", only "VARCHAR NOT NULL"
return spec;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.hoptimator.catalog;

import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.Litmus;
import org.apache.avro.Schema;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;

public class AvroConverterTest {

@Test
public void convertsNestedSchemas() {
String schemaString = "{\"type\":\"record\",\"name\":\"E\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"h\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"H\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"A\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"A\",\"fields\":[]}]}]}]}]}";

Schema avroSchema1 = (new Schema.Parser()).parse(schemaString);
RelDataType rel1 = AvroConverter.rel(avroSchema1);
assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size());
assertTrue(rel1.toString(), rel1.getField("h", false, false) != null);
RelDataType rel2 = rel1.getField("h", false, false).getType();
assertTrue(rel2.toString(), rel2.isNullable());
Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1);
assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size());
assertTrue(rel2.toString(), rel2.getField("A", false, false) != null);
RelDataType rel3 = rel2.getField("A", false, false).getType();
assertTrue(rel3.toString(), rel3.isNullable());
Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1);
assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size());
Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1);
assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable());
assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount());
Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2);
assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable());
assertEquals(avroSchema5.toString(), avroSchema5.getTypes().get(1).getFields().size(), rel2.getFieldCount());
Schema avroSchema6 = AvroConverter.avro("NS", "R", rel3);
assertEquals(avroSchema6.toString(), avroSchema6.getTypes().get(1).getFields().size(), rel3.getFieldCount());
RelDataType rel4 = AvroConverter.rel(avroSchema4);
assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW));
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.linkedin.hoptimator.planner;

import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.dialect.AnsiSqlDialect;
import org.apache.calcite.util.Litmus;

import com.linkedin.hoptimator.catalog.Resource;
import com.linkedin.hoptimator.catalog.ResourceProvider;
Expand Down Expand Up @@ -77,6 +79,7 @@ public ScriptImplementor query() {

/** Script ending in INSERT INTO ... */
public ScriptImplementor insertInto(HopTable sink) {
RelOptUtil.eq(sink.name(), sink.rowType(), "subscription", rowType(), Litmus.THROW);
return script.database(sink.database()).with(sink)
.insert(sink.database(), sink.name(), relNode);
}
Expand Down

0 comments on commit e045a52

Please sign in to comment.