Skip to content

Commit

Permalink
Fix target column inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
jogrogan committed Dec 23, 2024
1 parent bb90ea2 commit 8c6b927
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.AbstractSchema;


/** JDBC driver with fake in-memory data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeSystem;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.hoptimator.util.planner;

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -9,12 +10,13 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCollectionTypeNameSpec;
Expand All @@ -35,7 +37,9 @@
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -285,17 +289,58 @@ private static RelNode dropFields(String schema, RelNode relNode,
// TODO: Need a better way to dynamically modify the script implementer based on the schema
if (schema.startsWith("VENICE")
&& !targetFieldNames.contains(field.getName())) {
i++;
continue;
}
if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) {
cols.add(i);
}
i++;
}
return RelOptUtil.createProject(relNode, cols);
return createForceProject(relNode, cols);
}
}

static RelNode createForceProject(final RelNode child, final List<Integer> posList) {
return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList);
}

// By default, "projectNamed" will try to provide an optimization by not creating a new project if the
// field types are the same. This is not desirable in the insert case as the field names need to match the sink.
//
// Example:
// INSERT INTO `my-store` (`KEY_id`, `stringField`) SELECT * FROM `KAFKA`.`existing-topic-1`;
// Without forced projection this will get optimized to:
// INSERT INTO `my-store` (`KEYFIELD`, `VARCHARFIELD`) SELECT * FROM `KAFKA`.`existing-topic-1`;
// With forced project this will resolve as:
// INSERT INTO `my-store` (`KEY_id`, `stringField`) SELECT `KEYFIELD` AS `KEY_id`, \
// `VARCHARFIELD` AS `stringField` FROM `KAFKA`.`existing-topic-1`;
//
// This implementation is largely a duplicate of RelOptUtil.createProject(relNode, cols); which does not allow
// overriding the "force" argument of "projectNamed".
static RelNode createForceProject(final RelFactories.ProjectFactory factory,
final RelNode child, final List<Integer> posList) {
RelDataType rowType = child.getRowType();
final List<String> fieldNames = rowType.getFieldNames();
final RelBuilder relBuilder =
RelBuilder.proto(factory).create(child.getCluster(), null);
final List<RexNode> exprs = new AbstractList<RexNode>() {
@Override public int size() {
return posList.size();
}

@Override public RexNode get(int index) {
final int pos = posList.get(index);
return relBuilder.getRexBuilder().makeInputRef(child, pos);
}
};
final List<String> names = Util.select(fieldNames, posList);
return relBuilder
.push(child)
.projectNamed(exprs, names, true)
.build();
}

/** Implements a CREATE DATABASE IF NOT EXISTS statement */
class DatabaseImplementor implements ScriptImplementor {
private final String database;
Expand Down
6 changes: 3 additions & 3 deletions hoptimator-venice/src/test/resources/venice-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ spec:
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `test-store` (`KEY_string` VARCHAR, `string` VARCHAR) WITH ('storeName'='test-store', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_string', 'partial-update-mode'='true')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`KEY_string` VARCHAR, `string` VARCHAR) WITH ('storeName'='test-store-1', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_string', 'partial-update-mode'='true')
- INSERT INTO `test-store-1` (`KEY_string`, `string`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
- CREATE TABLE IF NOT EXISTS `test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('value.fields-include'='EXCEPT_KEY', 'storeName'='test-store', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_id', 'partial-update-mode'='true')
- CREATE TABLE IF NOT EXISTS `test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('value.fields-include'='EXCEPT_KEY', 'storeName'='test-store-1', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_id', 'partial-update-mode'='true')
- INSERT INTO `test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down

0 comments on commit 8c6b927

Please sign in to comment.