Skip to content

Commit

Permalink
Address feedback, rebase to work with structured Key type, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan committed Dec 24, 2024
1 parent 5fb5687 commit 1973fe0
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private static String applyTransform(String value, String transform) {
res = res.replace("\n", "");
break;
default:
log.info("Transformation function '{}' not found", f);
log.warn("Transformation function '{}' not found", f);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.hoptimator.util;

import java.util.Collections;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -19,13 +19,13 @@ private DataTypeUtils() {

/**
* Flattens nested structs and complex arrays.
*
* Nested structs like `FOO Row(BAR Row(QUX VARCHAR)))` are promoted to
*
* Nested structs like `FOO Row(BAR Row(QUX VARCHAR))` are promoted to
* top-level fields like `FOO$BAR$QUX VARCHAR`.
*
* Complex arrays are demoted to just `ANY ARRAY`. Primitive arrays are
* unchanged.
*
*
*/
public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) {
if (!dataType.isStruct()) {
Expand Down Expand Up @@ -56,7 +56,7 @@ private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType data
/** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` */
public static RelDataType unflatten(RelDataType dataType, RelDataTypeFactory typeFactory) {
if (!dataType.isStruct()) {
throw new IllegalArgumentException("Can only unflatten a struct type.");
throw new IllegalArgumentException("Can only unflatten a struct type.");
}
Node root = new Node();
for (RelDataTypeField field : dataType.getFieldList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

import javax.sql.DataSource;

import org.apache.calcite.adapter.jdbc.JdbcConvention;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -14,7 +15,8 @@
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.util.Litmus;
import org.apache.calcite.util.Pair;

Expand All @@ -27,6 +29,7 @@
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.SqlDialect;
import com.linkedin.hoptimator.util.ConnectionService;
import com.linkedin.hoptimator.util.DataTypeUtils;
import com.linkedin.hoptimator.util.DeploymentService;


Expand Down Expand Up @@ -93,12 +96,14 @@ public void setSink(String database, List<String> path, RelDataType rowType, Map
private Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
Map<String, String> newOptions = new HashMap<>(options);

RelDataType flattened = DataTypeUtils.flatten(rowType, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT));

// If the keys are already set, don't overwrite them
if (newOptions.containsKey(KEY_OPTION)) {
return newOptions;
}
String keyString = rowType.getFieldList().stream()
.map(RelDataTypeField::getName)
String keyString = flattened.getFieldList().stream()
.map(x -> x.getName().replaceAll("\\$", "_"))
.filter(name -> name.startsWith(KEY_PREFIX))
.collect(Collectors.joining(";"));
if (!keyString.isEmpty()) {
Expand All @@ -125,7 +130,7 @@ public Pipeline pipeline() throws SQLException {
Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions);
ConnectionService.configure(sink, Sink.class);
Job job = new Job(sink, sql());
RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW);
RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW);
deployables.addAll(DeploymentService.deployables(sink, Sink.class));
deployables.addAll(DeploymentService.deployables(job, Job.class));
return new Pipeline(deployables);
Expand All @@ -144,8 +149,8 @@ public Function<SqlDialect, String> sql() throws SQLException {
Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions);
Map<String, String> sinkConfigs = ConnectionService.configure(sink, Sink.class);
script = script.connector(sink.table(), targetRowType, sinkConfigs);
script = script.insert(sink.table(), sink.schema(), query, targetFields);
RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW);
script = script.insert(sink.table(), query, targetFields);
RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW);
return script.seal();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.linkedin.hoptimator.util.DataTypeUtils;

import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.Convention;
Expand All @@ -35,14 +31,16 @@
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;

import com.linkedin.hoptimator.util.DataTypeUtils;


public final class PipelineRules {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ default ScriptImplementor database(String database) {
}

/** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */
default ScriptImplementor insert(String name, String schema, RelNode relNode,
ImmutableList<Pair<Integer, String>> targetFields) {
return with(new InsertImplementor(name, schema, relNode, targetFields));
default ScriptImplementor insert(String name, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
return with(new InsertImplementor(name, relNode, targetFields));
}

/** Render the script as DDL/SQL in the default dialect */
Expand Down Expand Up @@ -171,7 +170,7 @@ public void implement(SqlWriter w) {
if (select.getSelectList() != null) {
select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR));
}
select.accept(UNFLATTEN_MEMBER_ACCESS).unparse(w, 0, 0);
select.accept(new UnflattenMemberAccess(this)).unparse(w, 0, 0);
}

// A `ROW(...)` operator which will unparse as just `(...)`.
Expand All @@ -191,17 +190,31 @@ public SqlNode visit(SqlCall call) {
}
};

// a shuttle that replaces `FOO$BAR` with `FOO.BAR`
private static final SqlShuttle UNFLATTEN_MEMBER_ACCESS = new SqlShuttle() {
private class UnflattenMemberAccess extends SqlShuttle {
private final Set<String> sinkFieldList;

UnflattenMemberAccess(QueryImplementor outer) {
this.sinkFieldList = outer.relNode.getRowType().getFieldList()
.stream()
.map(RelDataTypeField::getName)
.collect(Collectors.toSet());
}

// SqlShuttle gets called for every field in SELECT and every table name in FROM alike
// For fields in SELECT, we want to unflatten them as `FOO_BAR`, for tables `FOO.BAR`
@Override
public SqlNode visit(SqlIdentifier id) {
SqlIdentifier replacement = new SqlIdentifier(id.names.stream()
.flatMap(x -> Stream.of(x.split("\\$")))
.collect(Collectors.toList()), SqlParserPos.ZERO);
id.assignNamesFrom(replacement);
if (id.names.size() == 1 && sinkFieldList.contains(id.names.get(0))) {
id.assignNamesFrom(new SqlIdentifier(id.names.get(0).replaceAll("\\$", "_"), SqlParserPos.ZERO));
} else {
SqlIdentifier replacement = new SqlIdentifier(id.names.stream()
.flatMap(x -> Stream.of(x.split("\\$")))
.collect(Collectors.toList()), SqlParserPos.ZERO);
id.assignNamesFrom(replacement);
}
return id;
}
};
}
}

/**
Expand Down Expand Up @@ -270,14 +283,11 @@ public void implement(SqlWriter w) {
* */
class InsertImplementor implements ScriptImplementor {
private final String name;
private final String schema;
private final RelNode relNode;
private final ImmutableList<Pair<Integer, String>> targetFields;

public InsertImplementor(String name, String schema, RelNode relNode,
ImmutableList<Pair<Integer, String>> targetFields) {
public InsertImplementor(String name, RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
this.name = name;
this.schema = schema;
this.relNode = relNode;
this.targetFields = targetFields;
}
Expand All @@ -286,29 +296,21 @@ public InsertImplementor(String name, String schema, RelNode relNode,
public void implement(SqlWriter w) {
w.keyword("INSERT INTO");
(new IdentifierImplementor(name)).implement(w);
// SqlWriter.Frame frame1 = w.startList("(", ")");
RelNode project = dropFields(schema, relNode, targetFields);
RelNode project = dropFields(relNode, targetFields);
(new ColumnListImplementor(project.getRowType())).implement(w);
// w.endList(frame1);
(new QueryImplementor(project)).implement(w);
w.literal(";");
}

// Drops NULL fields
// Drops additional fields by schema
private static RelNode dropFields(String schema, RelNode relNode,
ImmutableList<Pair<Integer, String>> targetFields) {
// Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ...
private static RelNode dropFields(RelNode relNode, ImmutableList<Pair<Integer, String>> targetFields) {
List<Integer> cols = new ArrayList<>();
int i = 0;
Set<String> targetFieldNames = targetFields.stream().map(x -> x.right).collect(Collectors.toSet());
for (RelDataTypeField field : relNode.getRowType().getFieldList()) {
// TODO: Need a better way to dynamically modify the script implementer based on the schema
if (schema.startsWith("VENICE")
&& !targetFieldNames.contains(field.getName())) {
i++;
continue;
}
if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
if (targetFieldNames.contains(field.getName())
&& !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
cols.add(i);
}
i++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@
import java.util.List;
import java.util.stream.Collectors;

import com.linkedin.hoptimator.util.planner.ScriptImplementor;

import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Litmus;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.linkedin.hoptimator.util.planner.ScriptImplementor;


public class TestDataTypeUtils {
Expand All @@ -33,7 +32,7 @@ public void flattenUnflatten() {
builder3.add("FOO", builder1.build());
builder3.add("BAR", builder2.build());
RelDataType rowType = builder3.build();
Assertions.assertEquals(2, rowType.getFieldList().size());
Assertions.assertEquals(2, rowType.getFieldList().size());
RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory);
Assertions.assertEquals(3, flattenedType.getFieldList().size());
List<String> flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName())
Expand Down Expand Up @@ -65,7 +64,7 @@ public void flattenNestedArrays() {
builder3.add("FOO", typeFactory.createArrayType(builder1.build(), -1));
builder3.add("BAR", typeFactory.createArrayType(builder2.build(), -1));
RelDataType rowType = builder3.build();
Assertions.assertEquals(2, rowType.getFieldList().size());
Assertions.assertEquals(2, rowType.getFieldList().size());
RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory);
Assertions.assertEquals(2, flattenedType.getFieldList().size());
List<String> flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName())
Expand All @@ -77,5 +76,5 @@ public void flattenNestedArrays() {
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `T1` (`FOO` ANY ARRAY, "
+ "`BAR` ANY ARRAY) WITH ();", flattenedConnector,
"Flattened connector should have simplified arrays");
}
}
}
}
1 change: 1 addition & 0 deletions hoptimator-venice/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {

dependencies {
implementation project(':hoptimator-avro')
implementation project(':hoptimator-util')
implementation libs.calcite.core
implementation(libs.venice) {
// Venice pulls in snakeyaml v2.0 which has conflicting APIs with 1.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ protected String discoverLeaderController() {
URL controllerUrl = new URL(super.discoverLeaderController());
return controllerUrl.getProtocol() + "://localhost:" + controllerUrl.getPort();
} catch (MalformedURLException e) {
throw new RuntimeException(e);
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.apache.calcite.schema.SchemaPlus;


/** JDBC driver for Venice topics. */
/** JDBC driver for Venice stores. */
public class VeniceDriver extends Driver {

public static final String CATALOG_NAME = "VENICE";
Expand Down
Loading

0 comments on commit 1973fe0

Please sign in to comment.