diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java index ffef6e9..5a1cb5d 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java @@ -350,7 +350,7 @@ private static String applyTransform(String value, String transform) { res = res.replace("\n", ""); break; default: - log.info("Transformation function '{}' not found", f); + log.warn("Transformation function '{}' not found", f); break; } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java index ba446b9..2add458 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java @@ -1,8 +1,8 @@ package com.linkedin.hoptimator.util; import java.util.Collections; -import java.util.List; import java.util.LinkedHashMap; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -19,13 +19,13 @@ private DataTypeUtils() { /** * Flattens nested structs and complex arrays. - * - * Nested structs like `FOO Row(BAR Row(QUX VARCHAR)))` are promoted to + * + * Nested structs like `FOO Row(BAR Row(QUX VARCHAR))` are promoted to * top-level fields like `FOO$BAR$QUX VARCHAR`. * * Complex arrays are demoted to just `ANY ARRAY`. Primitive arrays are * unchanged. - * + * */ public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) { if (!dataType.isStruct()) { @@ -56,7 +56,7 @@ private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType data /** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` */ public static RelDataType unflatten(RelDataType dataType, RelDataTypeFactory typeFactory) { if (!dataType.isStruct()) { - throw new IllegalArgumentException("Can only unflatten a struct type."); + throw new IllegalArgumentException("Can only unflatten a struct type."); } Node root = new Node(); for (RelDataTypeField field : dataType.getFieldList()) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java index c29c5fc..c66780c 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java @@ -2,12 +2,10 @@ import javax.sql.DataSource; -import org.apache.calcite.adapter.jdbc.JdbcConvention; import org.apache.calcite.adapter.jdbc.JdbcSchema; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; -import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlDialectFactory; import org.apache.calcite.sql.SqlDialectFactoryImpl; diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 8f3582f..06b8827 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -14,7 +15,8 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.util.Litmus; import org.apache.calcite.util.Pair; @@ -27,6 +29,7 @@ import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.util.ConnectionService; +import com.linkedin.hoptimator.util.DataTypeUtils; import com.linkedin.hoptimator.util.DeploymentService; @@ -93,12 +96,14 @@ public void setSink(String database, List path, RelDataType rowType, Map private Map addKeysAsOption(Map options, RelDataType rowType) { Map newOptions = new HashMap<>(options); + RelDataType flattened = DataTypeUtils.flatten(rowType, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)); + // If the keys are already set, don't overwrite them if (newOptions.containsKey(KEY_OPTION)) { return newOptions; } - String keyString = rowType.getFieldList().stream() - .map(RelDataTypeField::getName) + String keyString = flattened.getFieldList().stream() + .map(x -> x.getName().replaceAll("\\$", "_")) .filter(name -> name.startsWith(KEY_PREFIX)) .collect(Collectors.joining(";")); if (!keyString.isEmpty()) { @@ -125,7 +130,7 @@ public Pipeline pipeline() throws SQLException { Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions); ConnectionService.configure(sink, Sink.class); Job job = new Job(sink, sql()); - RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); + RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); deployables.addAll(DeploymentService.deployables(sink, Sink.class)); deployables.addAll(DeploymentService.deployables(job, Job.class)); return new Pipeline(deployables); @@ -144,8 +149,8 @@ public Function sql() throws SQLException { Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions); Map sinkConfigs = ConnectionService.configure(sink, Sink.class); script = script.connector(sink.table(), targetRowType, sinkConfigs); - script = script.insert(sink.table(), sink.schema(), query, targetFields); - RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); + script = script.insert(sink.table(), query, targetFields); + RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); return script.seal(); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java index 7ac2c02..fc314bb 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java @@ -8,10 +8,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import com.linkedin.hoptimator.util.DataTypeUtils; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.Convention; @@ -35,14 +31,16 @@ import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import com.linkedin.hoptimator.util.DataTypeUtils; + public final class PipelineRules { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 4b8b60e..65f80de 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -103,9 +103,8 @@ default ScriptImplementor database(String database) { } /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ - default ScriptImplementor insert(String name, String schema, RelNode relNode, - ImmutableList> targetFields) { - return with(new InsertImplementor(name, schema, relNode, targetFields)); + default ScriptImplementor insert(String name, RelNode relNode, ImmutableList> targetFields) { + return with(new InsertImplementor(name, relNode, targetFields)); } /** Render the script as DDL/SQL in the default dialect */ @@ -171,7 +170,7 @@ public void implement(SqlWriter w) { if (select.getSelectList() != null) { select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); } - select.accept(UNFLATTEN_MEMBER_ACCESS).unparse(w, 0, 0); + select.accept(new UnflattenMemberAccess(this)).unparse(w, 0, 0); } // A `ROW(...)` operator which will unparse as just `(...)`. @@ -191,17 +190,31 @@ public SqlNode visit(SqlCall call) { } }; - // a shuttle that replaces `FOO$BAR` with `FOO.BAR` - private static final SqlShuttle UNFLATTEN_MEMBER_ACCESS = new SqlShuttle() { + private class UnflattenMemberAccess extends SqlShuttle { + private final Set sinkFieldList; + + UnflattenMemberAccess(QueryImplementor outer) { + this.sinkFieldList = outer.relNode.getRowType().getFieldList() + .stream() + .map(RelDataTypeField::getName) + .collect(Collectors.toSet()); + } + + // SqlShuttle gets called for every field in SELECT and every table name in FROM alike + // For fields in SELECT, we want to unflatten them as `FOO_BAR`, for tables `FOO.BAR` @Override public SqlNode visit(SqlIdentifier id) { - SqlIdentifier replacement = new SqlIdentifier(id.names.stream() - .flatMap(x -> Stream.of(x.split("\\$"))) - .collect(Collectors.toList()), SqlParserPos.ZERO); - id.assignNamesFrom(replacement); + if (id.names.size() == 1 && sinkFieldList.contains(id.names.get(0))) { + id.assignNamesFrom(new SqlIdentifier(id.names.get(0).replaceAll("\\$", "_"), SqlParserPos.ZERO)); + } else { + SqlIdentifier replacement = new SqlIdentifier(id.names.stream() + .flatMap(x -> Stream.of(x.split("\\$"))) + .collect(Collectors.toList()), SqlParserPos.ZERO); + id.assignNamesFrom(replacement); + } return id; } - }; + } } /** @@ -270,14 +283,11 @@ public void implement(SqlWriter w) { * */ class InsertImplementor implements ScriptImplementor { private final String name; - private final String schema; private final RelNode relNode; private final ImmutableList> targetFields; - public InsertImplementor(String name, String schema, RelNode relNode, - ImmutableList> targetFields) { + public InsertImplementor(String name, RelNode relNode, ImmutableList> targetFields) { this.name = name; - this.schema = schema; this.relNode = relNode; this.targetFields = targetFields; } @@ -286,29 +296,21 @@ public InsertImplementor(String name, String schema, RelNode relNode, public void implement(SqlWriter w) { w.keyword("INSERT INTO"); (new IdentifierImplementor(name)).implement(w); -// SqlWriter.Frame frame1 = w.startList("(", ")"); - RelNode project = dropFields(schema, relNode, targetFields); + RelNode project = dropFields(relNode, targetFields); (new ColumnListImplementor(project.getRowType())).implement(w); -// w.endList(frame1); (new QueryImplementor(project)).implement(w); w.literal(";"); } // Drops NULL fields - // Drops additional fields by schema - private static RelNode dropFields(String schema, RelNode relNode, - ImmutableList> targetFields) { + // Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ... + private static RelNode dropFields(RelNode relNode, ImmutableList> targetFields) { List cols = new ArrayList<>(); int i = 0; Set targetFieldNames = targetFields.stream().map(x -> x.right).collect(Collectors.toSet()); for (RelDataTypeField field : relNode.getRowType().getFieldList()) { - // TODO: Need a better way to dynamically modify the script implementer based on the schema - if (schema.startsWith("VENICE") - && !targetFieldNames.contains(field.getName())) { - i++; - continue; - } - if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { + if (targetFieldNames.contains(field.getName()) + && !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { cols.add(i); } i++; diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java index 968273c..67aa055 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java @@ -5,8 +5,6 @@ import java.util.List; import java.util.stream.Collectors; -import com.linkedin.hoptimator.util.planner.ScriptImplementor; - import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -14,9 +12,10 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Litmus; - -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.util.planner.ScriptImplementor; public class TestDataTypeUtils { @@ -33,7 +32,7 @@ public void flattenUnflatten() { builder3.add("FOO", builder1.build()); builder3.add("BAR", builder2.build()); RelDataType rowType = builder3.build(); - Assertions.assertEquals(2, rowType.getFieldList().size()); + Assertions.assertEquals(2, rowType.getFieldList().size()); RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); Assertions.assertEquals(3, flattenedType.getFieldList().size()); List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) @@ -65,7 +64,7 @@ public void flattenNestedArrays() { builder3.add("FOO", typeFactory.createArrayType(builder1.build(), -1)); builder3.add("BAR", typeFactory.createArrayType(builder2.build(), -1)); RelDataType rowType = builder3.build(); - Assertions.assertEquals(2, rowType.getFieldList().size()); + Assertions.assertEquals(2, rowType.getFieldList().size()); RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); Assertions.assertEquals(2, flattenedType.getFieldList().size()); List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) @@ -77,5 +76,5 @@ public void flattenNestedArrays() { Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `T1` (`FOO` ANY ARRAY, " + "`BAR` ANY ARRAY) WITH ();", flattenedConnector, "Flattened connector should have simplified arrays"); - } -} + } +} diff --git a/hoptimator-venice/build.gradle b/hoptimator-venice/build.gradle index bfc8d74..6e4cb11 100644 --- a/hoptimator-venice/build.gradle +++ b/hoptimator-venice/build.gradle @@ -5,6 +5,7 @@ plugins { dependencies { implementation project(':hoptimator-avro') + implementation project(':hoptimator-util') implementation libs.calcite.core implementation(libs.venice) { // Venice pulls in snakeyaml v2.0 which has conflicting APIs with 1.x diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/LocalControllerClient.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/LocalControllerClient.java index 348b5ea..67f081f 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/LocalControllerClient.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/LocalControllerClient.java @@ -20,7 +20,7 @@ protected String discoverLeaderController() { URL controllerUrl = new URL(super.discoverLeaderController()); return controllerUrl.getProtocol() + "://localhost:" + controllerUrl.getPort(); } catch (MalformedURLException e) { - throw new RuntimeException(e); + throw new IllegalArgumentException(e); } } } diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java index 07a0ab8..3a24429 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java @@ -13,7 +13,7 @@ import org.apache.calcite.schema.SchemaPlus; -/** JDBC driver for Venice topics. */ +/** JDBC driver for Venice stores. */ public class VeniceDriver extends Driver { public static final String CATALOG_NAME = "VENICE"; diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java index 85611ae..e4d97cf 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java @@ -1,21 +1,18 @@ package com.linkedin.hoptimator.venice; -import java.util.ArrayList; -import java.util.List; - import org.apache.avro.Schema; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.impl.AbstractTable; import com.linkedin.hoptimator.avro.AvroConverter; +import com.linkedin.hoptimator.util.DataTypeUtils; import com.linkedin.venice.client.schema.StoreSchemaFetcher; /** A batch of records from a Venice store. */ public class VeniceStore extends AbstractTable { - private static final String KEY_PREFIX = "KEY_"; private final StoreSchemaFetcher storeSchemaFetcher; public VeniceStore(StoreSchemaFetcher storeSchemaFetcher) { @@ -27,37 +24,15 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { Schema keySchema = storeSchemaFetcher.getKeySchema(); Schema valueSchema = storeSchemaFetcher.getLatestValueSchema(); - // Modify keySchema fields to contain a prefix of "KEY_" for their name in order to not clash with the value schema - // Combine fields from both the modified keySchema and the valueSchema - List combinedFields = new ArrayList<>(); - Schema combinedSchema; - if (keySchema.getType() == Schema.Type.RECORD) { - for (Schema.Field field : keySchema.getFields()) { - Schema.Field modifiedField = - new Schema.Field(KEY_PREFIX + field.name(), field.schema(), field.doc(), field.defaultVal(), field.order()); - combinedFields.add(modifiedField); - } - } else { - Schema.Field modifiedField = - new Schema.Field(KEY_PREFIX + keySchema.getName(), keySchema, keySchema.getDoc()); - combinedFields.add(modifiedField); - } - - if (valueSchema.getType() == Schema.Type.RECORD) { - for (Schema.Field field : valueSchema.getFields()) { - Schema.Field copiedField = - new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal(), field.order()); - combinedFields.add(copiedField); - } - combinedSchema = Schema.createRecord(valueSchema.getName(), valueSchema.getDoc(), valueSchema.getNamespace(), - keySchema.isError() || valueSchema.isError(), combinedFields); - } else { - Schema.Field copiedField = - new Schema.Field(valueSchema.getName(), valueSchema, valueSchema.getDoc()); - combinedFields.add(copiedField); - combinedSchema = Schema.createRecord("VeniceSchema", null, null, false, combinedFields); - } - return AvroConverter.rel(combinedSchema, typeFactory); + // Venice contains both a key schema and a value schema. Since we need to pass back one joint schema, + // and to avoid name collisions, all key fields are structured as "KEY$foo". + RelDataType key = rel(keySchema, typeFactory); + RelDataType value = rel(valueSchema, typeFactory); + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + builder.addAll(value.getFieldList()); + builder.add("KEY", key); + RelDataType combinedSchema = builder.build(); + return DataTypeUtils.flatten(combinedSchema, typeFactory); } protected RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/TestSqlScripts.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/TestSqlScripts.java index 398d3a2..27479ba 100644 --- a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/TestSqlScripts.java +++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/TestSqlScripts.java @@ -10,7 +10,19 @@ public class TestSqlScripts extends QuidemTestBase { @Test @Tag("integration") - public void veniceDdlScript() throws Exception { - run("venice-ddl.id"); + public void veniceDdlSelectScript() throws Exception { + run("venice-ddl-select.id"); + } + + @Test + @Tag("integration") + public void veniceDdlInsertAllScript() throws Exception { + run("venice-ddl-insert-all.id"); + } + + @Test + @Tag("integration") + public void veniceDdlInsertPartialScript() throws Exception { + run("venice-ddl-insert-partial.id"); } } diff --git a/hoptimator-venice/src/test/resources/venice-ddl.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id similarity index 51% rename from hoptimator-venice/src/test/resources/venice-ddl.id rename to hoptimator-venice/src/test/resources/venice-ddl-insert-all.id index 578aa34..49254fe 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id @@ -2,7 +2,6 @@ !use k8s insert into "VENICE-CLUSTER0"."test-store-1" select * from "VENICE-CLUSTER0"."test-store"; - apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: @@ -25,11 +24,11 @@ spec: job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - CREATE TABLE IF NOT EXISTS `test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('value.fields-include'='EXCEPT_KEY', 'storeName'='test-store', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_id', 'partial-update-mode'='true') - - CREATE TABLE IF NOT EXISTS `test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('value.fields-include'='EXCEPT_KEY', 'storeName'='test-store-1', 'connector'='venice', 'key.fields-prefix'='KEY_', 'key.fields'='KEY_id', 'partial-update-mode'='true') - - INSERT INTO `test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` + - CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` jarURI: local:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless state: running -!specify \ No newline at end of file +!specify diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id new file mode 100644 index 0000000..be1fa60 --- /dev/null +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -0,0 +1,34 @@ +!set outputformat mysql +!use k8s + +insert into "VENICE-CLUSTER0"."test-store-1" ("KEY$id", "intField") select "KEY$id", "stringField" from "VENICE-CLUSTER0"."test-store"; +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: venice-cluster0-test-store-1 +spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS INTEGER) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store` + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +!specify diff --git a/hoptimator-venice/src/test/resources/venice-ddl-select.id b/hoptimator-venice/src/test/resources/venice-ddl-select.id new file mode 100644 index 0000000..6327504 --- /dev/null +++ b/hoptimator-venice/src/test/resources/venice-ddl-select.id @@ -0,0 +1,34 @@ +!set outputformat mysql +!use k8s + +select * from "VENICE-CLUSTER0"."test-store-1"; +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: pipeline-sink +spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH () + - INSERT INTO `SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1` + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +!specify