diff --git a/.gitignore b/.gitignore index e1796c2..59743ea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .gradle .idea/ +/build */build/ */*.iml ./models/external/ diff --git a/Makefile b/Makefile index 772de79..d79b101 100644 --- a/Makefile +++ b/Makefile @@ -12,30 +12,31 @@ build: bounce: build undeploy deploy deploy-samples deploy-config deploy-demo -# Integration tests expect K8s and Kafka to be running -integration-tests: deploy-dev-environment deploy-samples - kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka - kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka - kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka - kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid - ./gradlew intTest || kill `cat port-forward.pid` - kill `cat port-forward.pid` - clean: ./gradlew clean deploy-config: kubectl create configmap hoptimator-configmap --from-file=model.yaml=test-model.yaml --dry-run=client -o yaml | kubectl apply -f - +undeploy-config: + kubectl delete configmap hoptimator-configmap || echo "skipping" + deploy: deploy-config kubectl apply -f ./hoptimator-k8s/src/main/resources/ kubectl apply -f ./deploy +undeploy: undeploy-config + kubectl delete -f ./deploy || echo "skipping" + kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping" + quickstart: build deploy deploy-demo: deploy kubectl apply -f ./deploy/samples/demodb.yaml +undeploy-demo: undeploy + kubectl delete -f ./deploy/samples/demodb.yaml + deploy-samples: deploy kubectl wait --for=condition=Established=True \ crds/subscriptions.hoptimator.linkedin.com \ @@ -43,11 +44,23 @@ deploy-samples: deploy crds/sqljobs.hoptimator.linkedin.com kubectl apply -f ./deploy/samples -deploy-dev-environment: deploy-config - kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" - kubectl create namespace kafka || echo "skipping" +undeploy-samples: undeploy + kubectl delete -f ./deploy/samples || echo "skipping" + +deploy-flink: helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/ helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator + +undeploy-flink: + kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping" + kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping" + kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping" + kubectl delete crd flinksessionjobs.flink.apache.org || echo "skipping" + helm uninstall flink-kubernetes-operator || echo "skipping" + +deploy-kafka: deploy deploy-flink + kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" + kubectl create namespace kafka || echo "skipping" kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io kubectl apply -f ./hoptimator-k8s/src/main/resources/ @@ -55,7 +68,7 @@ deploy-dev-environment: deploy-config kubectl apply -f ./deploy/samples/demodb.yaml kubectl apply -f ./deploy/samples/kafkadb.yaml -undeploy-dev-environment: +undeploy-kafka: kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping" kubectl delete strimzi -n kafka --all || echo "skipping" kubectl delete pvc -l strimzi.io/name=one-kafka -n kafka || echo "skipping" @@ -65,12 +78,31 @@ undeploy-dev-environment: kubectl delete -f ./deploy/dev || echo "skipping" kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping" kubectl delete namespace kafka || echo "skipping" - helm uninstall flink-kubernetes-operator || echo "skipping" kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping" -undeploy: undeploy-dev-environment - kubectl delete -f ./deploy || echo "skipping" - kubectl delete configmap hoptimator-configmap || echo "skipping" +# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now. +deploy-venice: deploy deploy-flink + docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml up -d --wait + docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchema.avsc schemas/valueSchema.avsc + docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-1 schemas/keySchema.avsc schemas/valueSchema.avsc + kubectl apply -f ./deploy/samples/venicedb.yaml + +undeploy-venice: + kubectl delete -f ./deploy/samples/venicedb.yaml || echo "skipping" + docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml down + +deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice + +undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy + +# Integration tests expect K8s, Kafka, and Venice to be running +integration-tests: deploy-dev-environment deploy-samples + kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka + kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka + kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka + kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid + ./gradlew intTest || kill `cat port-forward.pid` + kill `cat port-forward.pid` generate-models: ./generate-models.sh @@ -80,4 +112,4 @@ release: test -n "$(VERSION)" # MISSING ARG: $$VERSION ./gradlew publish -.PHONY: build test install clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release +.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests deploy-dev-environment undeploy-dev-environment generate-models release diff --git a/README.md b/README.md index bb25e84..b2284a5 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ The below setup will install a Kafka and Flink cluster within Kubernetes. ``` $ make install # build and install SQL CLI - $ make deploy-dev-environment # start local Kafka & Flink setups + $ make deploy-dev-environment # start all local dev setups $ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI $ ./hoptimator # start the SQL CLI > !intro diff --git a/deploy/docker/docker-compose-single-dc-setup.yaml b/deploy/docker/docker-compose-single-dc-setup.yaml new file mode 100644 index 0000000..760d1cd --- /dev/null +++ b/deploy/docker/docker-compose-single-dc-setup.yaml @@ -0,0 +1,84 @@ +services: + zookeeper: + image: venicedb/apache-zookeeper:3.9.0 + container_name: zookeeper + hostname: zookeeper + healthcheck: + test: ["CMD-SHELL", "echo ruok | nc zookeeper 2181"] + start_period: 10s + interval: 5s + timeout: 5s + retries: 5 + + kafka: + image: venicedb/apache-kafka:3.3.1 + container_name: kafka + hostname: kafka + environment: + - ZOOKEEPER_ADDRESS=zookeeper:2181 + depends_on: + zookeeper: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "bash -x bin/kafka-topics.sh --bootstrap-server localhost:9092 --list"] + start_period: 60s + interval: 5s + timeout: 20s + retries: 5 + + venice-controller: + image: venicedb/venice-controller:0.4.340 + container_name: venice-controller + hostname: venice-controller + depends_on: + kafka: + condition: service_healthy + ports: + - 5555:5555 + healthcheck: + test: ["CMD-SHELL", "sleep 5"] + start_period: 20s + interval: 5s + timeout: 20s + retries: 5 + + venice-server: + image: venicedb/venice-server:0.4.340 + container_name: venice-server + hostname: venice-server + depends_on: + venice-controller: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "sleep 5"] + start_period: 20s + interval: 5s + timeout: 20s + retries: 5 + + venice-router: + image: venicedb/venice-router:0.4.340 + container_name: venice-router + hostname: venice-router + depends_on: + venice-server: + condition: service_healthy + ports: + - 7777:7777 + healthcheck: + test: ["CMD-SHELL", "sleep 5"] + start_period: 20s + interval: 5s + timeout: 20s + retries: 5 + + venice-client: + image: venicedb/venice-client:0.4.340 + container_name: venice-client + hostname: venice-client + tty: true + volumes: + - ./venice:/opt/venice/schemas + depends_on: + venice-router: + condition: service_healthy diff --git a/deploy/docker/venice/keySchema.avsc b/deploy/docker/venice/keySchema.avsc new file mode 100644 index 0000000..4a0f80f --- /dev/null +++ b/deploy/docker/venice/keySchema.avsc @@ -0,0 +1,11 @@ +{ + "type": "record", + "name": "SampleTableKey", + "doc": "SampleTableKey", + "fields": [ + { + "name": "id", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/deploy/docker/venice/valueSchema.avsc b/deploy/docker/venice/valueSchema.avsc new file mode 100644 index 0000000..92f9b89 --- /dev/null +++ b/deploy/docker/venice/valueSchema.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "SampleTableValue", + "doc": "SampleTableValue", + "fields": [ + { + "name": "intField", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "stringField", + "type": [ + "null", + "string" + ], + "default": null + } + ] +} \ No newline at end of file diff --git a/deploy/samples/venicedb.yaml b/deploy/samples/venicedb.yaml new file mode 100644 index 0000000..3a99357 --- /dev/null +++ b/deploy/samples/venicedb.yaml @@ -0,0 +1,25 @@ +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: Database +metadata: + name: venice-cluster0 +spec: + schema: VENICE-CLUSTER0 + url: jdbc:venice://cluster=venice-cluster0;router.url=http://localhost:7777 + dialect: Calcite + +--- + +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: TableTemplate +metadata: + name: venice-template-cluster0 +spec: + databases: + - venice-cluster0 + connector: | + connector = venice + storeName = {{table}} + partial-update-mode = true + key.fields-prefix = KEY_ + key.fields = {{keys}} + value.fields-include: EXCEPT_KEY diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3fadf2b..ae531ef 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -29,3 +29,5 @@ slf4j-api = "org.slf4j:slf4j-api:1.7.30" sqlline = "sqlline:sqlline:1.12.0" commons-cli = "commons-cli:commons-cli:1.4" quidem = "net.hydromatic:quidem:0.11" +venice = "com.linkedin.venice:venice-common:0.4.376" +venice-client = "com.linkedin.venice:venice-thin-client:0.4.376" diff --git a/hoptimator-avro/build.gradle b/hoptimator-avro/build.gradle index 475dac0..3311448 100644 --- a/hoptimator-avro/build.gradle +++ b/hoptimator-avro/build.gradle @@ -42,8 +42,8 @@ publishing { license { name = 'BSD 2-Clause' url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' - } } + } scm { connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' @@ -52,4 +52,4 @@ publishing { } } } -} +} \ No newline at end of file diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java index f585a0c..5a1cb5d 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/Resource.java @@ -20,6 +20,9 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Represents something required by a Table. @@ -37,6 +40,7 @@ * for informational/debugging purposes. */ public abstract class Resource { + private static final Logger log = LoggerFactory.getLogger(Resource.class); private final String template; private final SortedMap> properties = new TreeMap<>(); private final List inputs = new ArrayList<>(); @@ -345,6 +349,9 @@ private static String applyTransform(String value, String transform) { case "concat": res = res.replace("\n", ""); break; + default: + log.warn("Transformation function '{}' not found", f); + break; } } return res; diff --git a/hoptimator-cli/build.gradle b/hoptimator-cli/build.gradle index 7ec8d41..ad460fd 100644 --- a/hoptimator-cli/build.gradle +++ b/hoptimator-cli/build.gradle @@ -12,6 +12,7 @@ dependencies { implementation project(':hoptimator-demodb') implementation project(':hoptimator-jdbc') implementation project(':hoptimator-kafka') + implementation project(':hoptimator-venice') implementation project(':hoptimator-k8s') implementation project(':hoptimator-util') implementation libs.calcite.core diff --git a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java index 13ea162..296d24f 100644 --- a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java +++ b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java @@ -8,7 +8,7 @@ import java.util.Scanner; import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; import org.jline.reader.Completer; import com.linkedin.hoptimator.SqlDialect; @@ -87,8 +87,8 @@ public void execute(String line, DispatchCallback dispatchCallback) { String sql = split[1]; CalciteConnection conn = (CalciteConnection) sqlline.getConnection(); try { - RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel; - PipelineRel.Implementor plan = DeploymentService.plan(rel); + RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root; + PipelineRel.Implementor plan = DeploymentService.plan(root); sqlline.output(plan.sql().apply(SqlDialect.ANSI)); } catch (SQLException e) { sqlline.error(e); @@ -155,9 +155,9 @@ public void execute(String line, DispatchCallback dispatchCallback) { } String sql = split[1]; CalciteConnection conn = (CalciteConnection) sqlline.getConnection(); - RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel; + RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root; try { - List specs = DeploymentService.plan(rel).pipeline().specify(); + List specs = DeploymentService.plan(root).pipeline().specify(); specs.forEach(x -> sqlline.output(x + "\n\n---\n\n")); } catch (SQLException e) { sqlline.error(e); diff --git a/hoptimator-cli/src/main/resources/intro.txt b/hoptimator-cli/src/main/resources/intro.txt index c4413ac..1b6b00b 100644 --- a/hoptimator-cli/src/main/resources/intro.txt +++ b/hoptimator-cli/src/main/resources/intro.txt @@ -4,7 +4,7 @@ Try: > !tables > !schemas > create view foo as select * from ads.ad_clicks natural join profile.members; - > !yaml select * from foo + > !specify select * from foo > !pipeline select * from foo diff --git a/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java b/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java index 2c5ea6e..5acccfa 100644 --- a/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java +++ b/hoptimator-demodb/src/main/java/com/linkedin/hoptimator/demodb/DemoDriver.java @@ -13,7 +13,6 @@ import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.jdbc.Driver; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.impl.AbstractSchema; /** JDBC driver with fake in-memory data. */ @@ -59,9 +58,6 @@ public Connection connect(String url, Properties props) throws SQLException { if (schemas.isEmpty() || schemas.contains("ADS")) { rootSchema.add("ADS", new AdsSchema()); } - if (schemas.isEmpty() || schemas.contains("VENICE")) { - rootSchema.add("VENICE", new AbstractSchema()); - } return connection; } catch (Exception e) { throw new SQLException("Problem loading " + url, e); diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 73ba94b..e65c7a1 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -27,7 +27,7 @@ import org.apache.calcite.jdbc.CalcitePrepare; import org.apache.calcite.jdbc.CalciteSchema; -import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.schema.Function; @@ -165,8 +165,8 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con RelDataType rowType = materializedViewTable.getRowType(new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)); // Plan a pipeline to materialize the view. - RelNode rel = HoptimatorDriver.convert(context, sql).root.rel; - PipelineRel.Implementor plan = DeploymentService.plan(rel); + RelRoot root = HoptimatorDriver.convert(context, sql).root; + PipelineRel.Implementor plan = DeploymentService.plan(root); plan.setSink(database, viewPath, rowType, Collections.emptyMap()); Pipeline pipeline = plan.pipeline(); diff --git a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java index 207ba77..f17fad1 100644 --- a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java +++ b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java @@ -10,14 +10,16 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.sql.Connection; import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.calcite.jdbc.CalciteConnection; -import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; import org.junit.jupiter.api.Assertions; import net.hydromatic.quidem.AbstractCommand; @@ -31,7 +33,7 @@ public abstract class QuidemTestBase { protected void run(String resourceName) throws IOException, URISyntaxException { - run(Thread.currentThread().getContextClassLoader().getResource(resourceName).toURI()); + run(Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(resourceName)).toURI()); } protected void run(URI resource) throws IOException { @@ -48,35 +50,36 @@ protected void run(URI resource) throws IOException { } List input = Files.readAllLines(in.toPath(), StandardCharsets.UTF_8); List output = Files.readAllLines(out.toPath(), StandardCharsets.UTF_8); - Assertions.assertTrue(!input.isEmpty(), "input script is empty"); - Assertions.assertTrue(!output.isEmpty(), "script output is empty"); + Assertions.assertFalse(input.isEmpty(), "input script is empty"); + Assertions.assertFalse(output.isEmpty(), "script output is empty"); for (String line : output) { System.out.println(line); } Assertions.assertIterableEquals(input, output); } - private static class CustomCommandHandler implements CommandHandler { + private static final class CustomCommandHandler implements CommandHandler { @Override public Command parseCommand(List lines, List content, final String line) { - List copy = new ArrayList<>(); - copy.addAll(lines); + List copy = new ArrayList<>(lines); if (line.startsWith("spec")) { return new AbstractCommand() { @Override public void execute(Context context, boolean execute) throws Exception { if (execute) { - if (!(context.connection() instanceof CalciteConnection)) { - throw new IllegalArgumentException("This connection doesn't support `!specify`."); + try (Connection connection = context.connection()) { + if (!(connection instanceof CalciteConnection)) { + throw new IllegalArgumentException("This connection doesn't support `!specify`."); + } + String sql = context.previousSqlCommand().sql; + CalciteConnection conn = (CalciteConnection) connection; + RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root; + String specs = + DeploymentService.plan(root).pipeline().specify().stream().sorted() + .collect(Collectors.joining("---\n")); + String[] lines = specs.replaceAll(";\n", "\n").split("\n"); + context.echo(Arrays.asList(lines)); } - String sql = context.previousSqlCommand().sql; - CalciteConnection conn = (CalciteConnection) context.connection(); - RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel; - String specs = - DeploymentService.plan(rel).pipeline().specify().stream().sorted() - .collect(Collectors.joining("---\n")); - String[] lines = specs.replaceAll(";\n", "\n").split("\n"); - context.echo(Arrays.asList(lines)); } else { context.echo(content); } diff --git a/hoptimator-k8s/build.gradle b/hoptimator-k8s/build.gradle index 726d65e..c0ee7db 100644 --- a/hoptimator-k8s/build.gradle +++ b/hoptimator-k8s/build.gradle @@ -13,6 +13,7 @@ dependencies { // These are included in case the demo databases are deployed. testRuntimeOnly project(':hoptimator-demodb') testRuntimeOnly project(':hoptimator-kafka') + testRuntimeOnly project(':hoptimator-venice') testImplementation(testFixtures(project(':hoptimator-jdbc'))) testImplementation(platform('org.junit:junit-bom:5.11.3')) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index 64529bf..75e9130 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -9,8 +9,8 @@ import java.util.Properties; import java.util.stream.Collectors; -import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.Connector; +import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList; import com.linkedin.hoptimator.util.Template; diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java index 26a684a..e068145 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java @@ -6,11 +6,11 @@ import org.apache.calcite.schema.impl.ViewTable; +import com.linkedin.hoptimator.Deployer; +import com.linkedin.hoptimator.DeployerProvider; import com.linkedin.hoptimator.Job; import com.linkedin.hoptimator.MaterializedView; import com.linkedin.hoptimator.Source; -import com.linkedin.hoptimator.Deployer; -import com.linkedin.hoptimator.DeployerProvider; public class K8sDeployerProvider implements DeployerProvider { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java index cb4e92f..69ab98f 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java @@ -6,10 +6,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.calcite.sql.dialect.AnsiSqlDialect; -import org.apache.calcite.sql.dialect.CalciteSqlDialect; -import org.apache.calcite.sql.dialect.MysqlSqlDialect; - import com.linkedin.hoptimator.Job; import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java index ba446b9..2add458 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DataTypeUtils.java @@ -1,8 +1,8 @@ package com.linkedin.hoptimator.util; import java.util.Collections; -import java.util.List; import java.util.LinkedHashMap; +import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -19,13 +19,13 @@ private DataTypeUtils() { /** * Flattens nested structs and complex arrays. - * - * Nested structs like `FOO Row(BAR Row(QUX VARCHAR)))` are promoted to + * + * Nested structs like `FOO Row(BAR Row(QUX VARCHAR))` are promoted to * top-level fields like `FOO$BAR$QUX VARCHAR`. * * Complex arrays are demoted to just `ANY ARRAY`. Primitive arrays are * unchanged. - * + * */ public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) { if (!dataType.isStruct()) { @@ -56,7 +56,7 @@ private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType data /** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` */ public static RelDataType unflatten(RelDataType dataType, RelDataTypeFactory typeFactory) { if (!dataType.isStruct()) { - throw new IllegalArgumentException("Can only unflatten a struct type."); + throw new IllegalArgumentException("Can only unflatten a struct type."); } Node root = new Node(); for (RelDataTypeField field : dataType.getFieldList()) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java index 893f5b7..8bd160f 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/DeploymentService.java @@ -10,7 +10,7 @@ import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; import org.apache.calcite.tools.Program; import org.apache.calcite.tools.Programs; @@ -89,15 +89,15 @@ public List specify() throws SQLException { } /** Plans a deployable Pipeline which implements the query. */ - public static PipelineRel.Implementor plan(RelNode rel) throws SQLException { - RelTraitSet traitSet = rel.getTraitSet().simplify().replace(PipelineRel.CONVENTION); + public static PipelineRel.Implementor plan(RelRoot root) throws SQLException { + RelTraitSet traitSet = root.rel.getTraitSet().simplify().replace(PipelineRel.CONVENTION); Program program = Programs.standard(); - RelOptPlanner planner = rel.getCluster().getPlanner(); + RelOptPlanner planner = root.rel.getCluster().getPlanner(); PipelineRules.rules().forEach(x -> planner.addRule(x)); // TODO add materializations here (currently empty list) - PipelineRel plan = (PipelineRel) program.run(rel.getCluster().getPlanner(), rel, traitSet, Collections.emptyList(), + PipelineRel plan = (PipelineRel) program.run(planner, root.rel, traitSet, Collections.emptyList(), Collections.emptyList()); - PipelineRel.Implementor implementor = new PipelineRel.Implementor(); + PipelineRel.Implementor implementor = new PipelineRel.Implementor(root.fields); implementor.visit(plan); return implementor; } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java index c29c5fc..c66780c 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/HoptimatorJdbcSchema.java @@ -2,12 +2,10 @@ import javax.sql.DataSource; -import org.apache.calcite.adapter.jdbc.JdbcConvention; import org.apache.calcite.adapter.jdbc.JdbcSchema; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Schemas; -import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlDialectFactory; import org.apache.calcite.sql.SqlDialectFactoryImpl; diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index bcfe9a7..235be0d 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -8,12 +8,18 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.util.Litmus; +import org.apache.calcite.util.Pair; + +import com.google.common.collect.ImmutableList; import com.linkedin.hoptimator.Deployable; import com.linkedin.hoptimator.Job; @@ -22,6 +28,7 @@ import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.util.ConnectionService; +import com.linkedin.hoptimator.util.DataTypeUtils; import com.linkedin.hoptimator.util.DeploymentService; @@ -36,11 +43,14 @@ public interface PipelineRel extends RelNode { Convention CONVENTION = new Convention.Impl("PIPELINE", PipelineRel.class); + String KEY_OPTION = "keys"; + String KEY_PREFIX = "KEY_"; void implement(Implementor implementor) throws SQLException; /** Implements a deployable Pipeline. */ class Implementor { + private final ImmutableList> targetFields; private final Map sources = new LinkedHashMap<>(); private RelNode query; private String sinkDatabase = "pipeline"; @@ -48,9 +58,13 @@ class Implementor { private RelDataType sinkRowType = null; private Map sinkOptions = Collections.emptyMap(); + public Implementor(ImmutableList> targetFields) { + this.targetFields = targetFields; + } + public void visit(RelNode node) throws SQLException { - if (query == null) { - query = node; + if (this.query == null) { + this.query = node; } for (RelNode input : node.getInputs()) { visit(input); @@ -61,11 +75,11 @@ public void visit(RelNode node) throws SQLException { /** * Adds a source to the pipeline. * - * This involves deploying any relevant objects, and configuring a + * This involves deploying any relevant objects, and configuring * a connector. The connector is configured via `CREATE TABLE...WITH(...)`. */ public void addSource(String database, List path, RelDataType rowType, Map options) { - sources.put(new Source(database, path, options), rowType); + sources.put(new Source(database, path, addKeysAsOption(options, rowType)), rowType); } /** @@ -78,7 +92,26 @@ public void setSink(String database, List path, RelDataType rowType, Map this.sinkDatabase = database; this.sinkPath = path; this.sinkRowType = rowType; - this.sinkOptions = options; + this.sinkOptions = addKeysAsOption(options, rowType); + } + + private Map addKeysAsOption(Map options, RelDataType rowType) { + Map newOptions = new LinkedHashMap<>(options); + + RelDataType flattened = DataTypeUtils.flatten(rowType, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)); + + // If the keys are already set, don't overwrite them + if (newOptions.containsKey(KEY_OPTION)) { + return newOptions; + } + String keyString = flattened.getFieldList().stream() + .map(x -> x.getName().replaceAll("\\$", "_")) + .filter(name -> name.startsWith(KEY_PREFIX)) + .collect(Collectors.joining(";")); + if (!keyString.isEmpty()) { + newOptions.put(KEY_OPTION, keyString); + } + return newOptions; } public void setQuery(RelNode query) { @@ -89,25 +122,24 @@ public void setQuery(RelNode query) { public Pipeline pipeline() throws SQLException { List deployables = new ArrayList<>(); for (Source source : sources.keySet()) { - DeploymentService.deployables(source, Source.class).forEach(x -> deployables.add(x)); - Map configs = ConnectionService.configure(source, Source.class); + deployables.addAll(DeploymentService.deployables(source, Source.class)); + ConnectionService.configure(source, Source.class); } RelDataType targetRowType = sinkRowType; if (targetRowType == null) { targetRowType = query.getRowType(); } Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions); - Map sinkConfigs = ConnectionService.configure(sink, Sink.class); + ConnectionService.configure(sink, Sink.class); Job job = new Job(sink, sql()); - RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); - DeploymentService.deployables(sink, Sink.class).forEach(x -> deployables.add(x)); - DeploymentService.deployables(job, Job.class).forEach(x -> deployables.add(x)); + RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); + deployables.addAll(DeploymentService.deployables(sink, Sink.class)); + deployables.addAll(DeploymentService.deployables(job, Job.class)); return new Pipeline(deployables); } public Function sql() throws SQLException { ScriptImplementor script = ScriptImplementor.empty(); - List deployables = new ArrayList<>(); for (Map.Entry source : sources.entrySet()) { Map configs = ConnectionService.configure(source.getKey(), Source.class); script = script.connector(source.getKey().table(), source.getValue(), configs); @@ -119,8 +151,8 @@ public Function sql() throws SQLException { Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions); Map sinkConfigs = ConnectionService.configure(sink, Sink.class); script = script.connector(sink.table(), targetRowType, sinkConfigs); - script = script.insert(sink.table(), query); - RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); + script = script.insert(sink.table(), query, targetFields); + RelOptUtil.equal(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); return script.seal(); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java index 7ac2c02..fc314bb 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRules.java @@ -8,10 +8,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import com.linkedin.hoptimator.util.DataTypeUtils; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.Convention; @@ -35,14 +31,16 @@ import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.logical.LogicalProject; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import com.linkedin.hoptimator.util.DataTypeUtils; + public final class PipelineRules { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 3f91c2d..2fb4158 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -1,24 +1,25 @@ package com.linkedin.hoptimator.util.planner; +import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.linkedin.hoptimator.util.DataTypeUtils; - -import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.rel2sql.RelToSqlConverter; import org.apache.calcite.rel.rel2sql.SqlImplementor; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlBasicTypeNameSpec; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCollectionTypeNameSpec; @@ -40,6 +41,11 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.SqlShuttle; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; + +import com.google.common.collect.ImmutableList; /** @@ -97,8 +103,8 @@ default ScriptImplementor database(String database) { } /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ - default ScriptImplementor insert(String name, RelNode relNode) { - return with(new InsertImplementor(name, relNode)); + default ScriptImplementor insert(String name, RelNode relNode, ImmutableList> targetFields) { + return with(new InsertImplementor(name, relNode, targetFields)); } /** Render the script as DDL/SQL in the default dialect */ @@ -117,7 +123,7 @@ default String sql(SqlDialect dialect) { default Function seal() { return x -> { final String sql; - switch(x) { + switch (x) { case ANSI: sql = sql(AnsiSqlDialect.DEFAULT); break; @@ -164,7 +170,7 @@ public void implement(SqlWriter w) { if (select.getSelectList() != null) { select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); } - select.accept(UNFLATTEN_MEMBER_ACCESS).unparse(w, 0, 0); + select.accept(new UnflattenMemberAccess(this)).unparse(w, 0, 0); } // A `ROW(...)` operator which will unparse as just `(...)`. @@ -184,17 +190,31 @@ public SqlNode visit(SqlCall call) { } }; - // a shuttle that replaces `FOO$BAR` with `FOO.BAR` - private static final SqlShuttle UNFLATTEN_MEMBER_ACCESS = new SqlShuttle() { + private static class UnflattenMemberAccess extends SqlShuttle { + private final Set sinkFieldList; + + UnflattenMemberAccess(QueryImplementor outer) { + this.sinkFieldList = outer.relNode.getRowType().getFieldList() + .stream() + .map(RelDataTypeField::getName) + .collect(Collectors.toSet()); + } + + // SqlShuttle gets called for every field in SELECT and every table name in FROM alike + // For fields in SELECT, we want to unflatten them as `FOO_BAR`, for tables `FOO.BAR` @Override public SqlNode visit(SqlIdentifier id) { - SqlIdentifier replacement = new SqlIdentifier(id.names.stream() - .flatMap(x -> Stream.of(x.split("\\$"))) - .collect(Collectors.toList()), SqlParserPos.ZERO); - id.assignNamesFrom(replacement); + if (id.names.size() == 1 && sinkFieldList.contains(id.names.get(0))) { + id.assignNamesFrom(new SqlIdentifier(id.names.get(0).replaceAll("\\$", "_"), SqlParserPos.ZERO)); + } else { + SqlIdentifier replacement = new SqlIdentifier(id.names.stream() + .flatMap(x -> Stream.of(x.split("\\$"))) + .collect(Collectors.toList()), SqlParserPos.ZERO); + id.assignNamesFrom(replacement); + } return id; } - }; + } } /** @@ -264,35 +284,81 @@ public void implement(SqlWriter w) { class InsertImplementor implements ScriptImplementor { private final String name; private final RelNode relNode; + private final ImmutableList> targetFields; - public InsertImplementor(String name, RelNode relNode) { + public InsertImplementor(String name, RelNode relNode, ImmutableList> targetFields) { this.name = name; this.relNode = relNode; + this.targetFields = targetFields; } @Override public void implement(SqlWriter w) { w.keyword("INSERT INTO"); (new IdentifierImplementor(name)).implement(w); - RelNode project = dropNullFields(relNode); + RelNode project = dropFields(relNode, targetFields); (new ColumnListImplementor(project.getRowType())).implement(w); (new QueryImplementor(project)).implement(w); w.literal(";"); } - private static RelNode dropNullFields(RelNode relNode) { + // Drops NULL fields + // Drops non-target columns, for use case: INSERT INTO (col1, col2) SELECT * FROM ... + private static RelNode dropFields(RelNode relNode, ImmutableList> targetFields) { List cols = new ArrayList<>(); int i = 0; + Set targetFieldNames = targetFields.stream().map(x -> x.right).collect(Collectors.toSet()); for (RelDataTypeField field : relNode.getRowType().getFieldList()) { - if (!field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { + if (targetFieldNames.contains(field.getName()) + && !field.getType().getSqlTypeName().equals(SqlTypeName.NULL)) { cols.add(i); } i++; } - return RelOptUtil.createProject(relNode, cols); + return createForceProject(relNode, cols); } } + static RelNode createForceProject(final RelNode child, final List posList) { + return createForceProject(RelFactories.DEFAULT_PROJECT_FACTORY, child, posList); + } + + // By default, "projectNamed" will try to provide an optimization by not creating a new project if the + // field types are the same. This is not desirable in the insert case as the field names need to match the sink. + // + // Example: + // INSERT INTO `my-store` (`KEY_id`, `stringField`) SELECT * FROM `KAFKA`.`existing-topic-1`; + // Without forced projection this will get optimized to: + // INSERT INTO `my-store` (`KEYFIELD`, `VARCHARFIELD`) SELECT * FROM `KAFKA`.`existing-topic-1`; + // With forced project this will resolve as: + // INSERT INTO `my-store` (`KEY_id`, `stringField`) SELECT `KEYFIELD` AS `KEY_id`, \ + // `VARCHARFIELD` AS `stringField` FROM `KAFKA`.`existing-topic-1`; + // + // This implementation is largely a duplicate of RelOptUtil.createProject(relNode, cols); which does not allow + // overriding the "force" argument of "projectNamed". + static RelNode createForceProject(final RelFactories.ProjectFactory factory, + final RelNode child, final List posList) { + RelDataType rowType = child.getRowType(); + final List fieldNames = rowType.getFieldNames(); + final RelBuilder relBuilder = + RelBuilder.proto(factory).create(child.getCluster(), null); + final List exprs = new AbstractList() { + @Override public int size() { + return posList.size(); + } + + @Override public RexNode get(int index) { + final int pos = posList.get(index); + return relBuilder.getRexBuilder().makeInputRef(child, pos); + } + }; + final List names = Util.select(fieldNames, posList); + return relBuilder + .push(child) + .projectNamed(exprs, names, true) + .build(); + } + /** Implements a CREATE DATABASE IF NOT EXISTS statement */ class DatabaseImplementor implements ScriptImplementor { private final String database; @@ -428,7 +494,7 @@ public ColumnListImplementor(List fields) { @Override public void implement(SqlWriter w) { - SqlWriter.Frame frame1 = w.startList("(", ")"); + SqlWriter.Frame frame1 = w.startList("(", ")"); List fieldNames = fields.stream() .map(x -> x.getName()) .map(x -> x.replaceAll("\\$", "_")) // support FOO$BAR columns as FOO_BAR diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java index 968273c..67aa055 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java @@ -5,8 +5,6 @@ import java.util.List; import java.util.stream.Collectors; -import com.linkedin.hoptimator.util.planner.ScriptImplementor; - import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -14,9 +12,10 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.Litmus; - -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.util.planner.ScriptImplementor; public class TestDataTypeUtils { @@ -33,7 +32,7 @@ public void flattenUnflatten() { builder3.add("FOO", builder1.build()); builder3.add("BAR", builder2.build()); RelDataType rowType = builder3.build(); - Assertions.assertEquals(2, rowType.getFieldList().size()); + Assertions.assertEquals(2, rowType.getFieldList().size()); RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); Assertions.assertEquals(3, flattenedType.getFieldList().size()); List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) @@ -65,7 +64,7 @@ public void flattenNestedArrays() { builder3.add("FOO", typeFactory.createArrayType(builder1.build(), -1)); builder3.add("BAR", typeFactory.createArrayType(builder2.build(), -1)); RelDataType rowType = builder3.build(); - Assertions.assertEquals(2, rowType.getFieldList().size()); + Assertions.assertEquals(2, rowType.getFieldList().size()); RelDataType flattenedType = DataTypeUtils.flatten(rowType, typeFactory); Assertions.assertEquals(2, flattenedType.getFieldList().size()); List flattenedNames = flattenedType.getFieldList().stream().map(x -> x.getName()) @@ -77,5 +76,5 @@ public void flattenNestedArrays() { Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `T1` (`FOO` ANY ARRAY, " + "`BAR` ANY ARRAY) WITH ();", flattenedConnector, "Flattened connector should have simplified arrays"); - } -} + } +} diff --git a/hoptimator-venice/build.gradle b/hoptimator-venice/build.gradle new file mode 100644 index 0000000..6e4cb11 --- /dev/null +++ b/hoptimator-venice/build.gradle @@ -0,0 +1,88 @@ +plugins { + id 'java-library' + id 'maven-publish' +} + +dependencies { + implementation project(':hoptimator-avro') + implementation project(':hoptimator-util') + implementation libs.calcite.core + implementation(libs.venice) { + // Venice pulls in snakeyaml v2.0 which has conflicting APIs with 1.x + exclude group: 'org.yaml' + } + implementation libs.venice.client + + testImplementation libs.junit + testImplementation libs.assertj + + // These are included in case the demo databases are deployed. + testRuntimeOnly project(':hoptimator-demodb') + + testRuntimeOnly project(':hoptimator-k8s') + testImplementation(testFixtures(project(':hoptimator-jdbc'))) + testImplementation(platform('org.junit:junit-bom:5.11.3')) + testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' +} + +tasks.register('intTest', Test) { + description = 'Runs integration tests.' + group = 'verification' + + shouldRunAfter test + + useJUnitPlatform { + includeTags 'integration' + } + + testLogging { + events "passed", "skipped", "failed" + } +} + +publishing { + repositories { + maven { + name 'GitHubPackages' + url = 'https://maven.pkg.github.com/linkedin/Hoptimator' + credentials { + username = System.getenv('GITHUB_ACTOR') + password = System.getenv('GITHUB_TOKEN') + } + } + maven { + name 'LinkedInJFrog' + url 'https://linkedin.jfrog.io/artifactory/hoptimator' + credentials { + username = System.getenv('JFROG_USERNAME') + password = System.getenv('JFROG_API_KEY') + } + } + } + publications { + maven(MavenPublication) { + groupId = 'com.linkedin.hoptimator' + artifactId = 'hoptimator-venice' + version = System.getenv('VERSION') + from components.java + pom { + name = 'LinkedIn Hoptimator' + description = 'Multi-hop declarative data pipelines' + url = 'https://github.com/linkedin/Hoptimator' + licenses { + license { + name = 'BSD 2-Clause' + url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' + } + } + scm { + connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' + url = 'https://github.com/linkedin/Hoptimator' + } + } + } + } +} diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java new file mode 100644 index 0000000..f83afc4 --- /dev/null +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/ClusterSchema.java @@ -0,0 +1,83 @@ +package com.linkedin.hoptimator.venice; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linkedin.venice.client.schema.StoreSchemaFetcher; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.ClientFactory; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerClientFactory; +import com.linkedin.venice.security.SSLFactory; +import com.linkedin.venice.utils.SslUtils; + + +public class ClusterSchema extends AbstractSchema { + private static final Logger log = LoggerFactory.getLogger(ClusterSchema.class); + + protected static final String SSL_FACTORY_CLASS_NAME = "ssl.factory.class.name"; + protected static final String DEFAULT_SSL_FACTORY_CLASS_NAME = "com.linkedin.venice.security.DefaultSSLFactory"; + protected final Properties properties; + protected final Map tableMap = new HashMap<>(); + + public ClusterSchema(Properties properties) { + this.properties = properties; + } + + public void populate() throws InterruptedException, ExecutionException, IOException { + tableMap.clear(); + String cluster = properties.getProperty("cluster"); + log.info("Loading Venice stores for cluster {}", cluster); + + String sslConfigPath = properties.getProperty("ssl-config-path"); + Optional sslFactory = Optional.empty(); + if (sslConfigPath != null) { + log.info("Using ssl configs at {}", sslConfigPath); + Properties sslProperties = SslUtils.loadSSLConfig(sslConfigPath); + String sslFactoryClassName = sslProperties.getProperty(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME); + sslFactory = Optional.of(SslUtils.getSSLFactory(sslProperties, sslFactoryClassName)); + } + + try (ControllerClient controllerClient = createControllerClient(cluster, sslFactory)) { + String[] stores = controllerClient.queryStoreList(false).getStores(); + log.info("Loaded {} Venice stores.", stores.length); + for (String store : stores) { + StoreSchemaFetcher storeSchemaFetcher = createStoreSchemaFetcher(store); + tableMap.put(store, createVeniceStore(storeSchemaFetcher)); + } + } + } + + protected ControllerClient createControllerClient(String cluster, Optional sslFactory) { + String routerUrl = properties.getProperty("router.url"); + if (routerUrl.contains("localhost")) { + return new LocalControllerClient(cluster, routerUrl, sslFactory); + } else { + return ControllerClientFactory.getControllerClient(cluster, routerUrl, sslFactory); + } + } + + protected StoreSchemaFetcher createStoreSchemaFetcher(String storeName) { + return ClientFactory.createStoreSchemaFetcher( + ClientConfig.defaultGenericClientConfig(storeName) + .setVeniceURL(properties.getProperty("router.url"))); + } + + protected VeniceStore createVeniceStore(StoreSchemaFetcher storeSchemaFetcher) { + return new VeniceStore(storeSchemaFetcher); + } + + @Override + public Map getTableMap() { + return tableMap; + } +} diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/LocalControllerClient.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/LocalControllerClient.java new file mode 100644 index 0000000..67f081f --- /dev/null +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/LocalControllerClient.java @@ -0,0 +1,26 @@ +package com.linkedin.hoptimator.venice; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Optional; + +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.security.SSLFactory; + + +public class LocalControllerClient extends ControllerClient { + + public LocalControllerClient(String clusterName, String discoveryUrls, Optional sslFactory) { + super(clusterName, discoveryUrls, sslFactory); + } + + @Override + protected String discoverLeaderController() { + try { + URL controllerUrl = new URL(super.discoverLeaderController()); + return controllerUrl.getProtocol() + "://localhost:" + controllerUrl.getPort(); + } catch (MalformedURLException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java new file mode 100644 index 0000000..3a24429 --- /dev/null +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceDriver.java @@ -0,0 +1,70 @@ +package com.linkedin.hoptimator.venice; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Locale; +import java.util.Properties; + +import org.apache.calcite.avatica.ConnectStringParser; +import org.apache.calcite.avatica.DriverVersion; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.Driver; +import org.apache.calcite.schema.SchemaPlus; + + +/** JDBC driver for Venice stores. */ +public class VeniceDriver extends Driver { + + public static final String CATALOG_NAME = "VENICE"; + + static { + new VeniceDriver().register(); + } + + @Override + protected String getConnectStringPrefix() { + return "jdbc:venice://"; + } + + @Override + protected DriverVersion createDriverVersion() { + return DriverVersion.load(this.getClass(), "venice.properties", "venice", "0", "venice", "0"); + } + + @Override + public Connection connect(String url, Properties props) throws SQLException { + if (!url.startsWith(getConnectStringPrefix())) { + return null; + } + Properties properties = ConnectStringParser.parse(url.substring(getConnectStringPrefix().length())); + String cluster = properties.getProperty("cluster"); + if (cluster == null) { + throw new IllegalArgumentException("Missing required cluster property. Need: jdbc:venice://cluster=..."); + } + cluster = cluster.toUpperCase(Locale.ROOT); + if (!cluster.startsWith(CATALOG_NAME)) { + cluster = CATALOG_NAME + "-" + cluster; + } + try { + Connection connection = super.connect(url, props); + if (connection == null) { + throw new IOException("Could not connect to " + url); + } + connection.setAutoCommit(true); // to prevent rollback() + connection.setCatalog(CATALOG_NAME); + CalciteConnection calciteConnection = (CalciteConnection) connection; + SchemaPlus rootSchema = calciteConnection.getRootSchema(); + ClusterSchema schema = createClusterSchema(properties); + schema.populate(); + rootSchema.add(cluster.toUpperCase(Locale.ROOT), schema); + return connection; + } catch (Exception e) { + throw new SQLException("Problem loading " + url, e); + } + } + + protected ClusterSchema createClusterSchema(Properties properties) { + return new ClusterSchema(properties); + } +} diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java new file mode 100644 index 0000000..e4d97cf --- /dev/null +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java @@ -0,0 +1,41 @@ +package com.linkedin.hoptimator.venice; + +import org.apache.avro.Schema; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.impl.AbstractTable; + +import com.linkedin.hoptimator.avro.AvroConverter; +import com.linkedin.hoptimator.util.DataTypeUtils; +import com.linkedin.venice.client.schema.StoreSchemaFetcher; + + +/** A batch of records from a Venice store. */ +public class VeniceStore extends AbstractTable { + + private final StoreSchemaFetcher storeSchemaFetcher; + + public VeniceStore(StoreSchemaFetcher storeSchemaFetcher) { + this.storeSchemaFetcher = storeSchemaFetcher; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + Schema keySchema = storeSchemaFetcher.getKeySchema(); + Schema valueSchema = storeSchemaFetcher.getLatestValueSchema(); + + // Venice contains both a key schema and a value schema. Since we need to pass back one joint schema, + // and to avoid name collisions, all key fields are structured as "KEY$foo". + RelDataType key = rel(keySchema, typeFactory); + RelDataType value = rel(valueSchema, typeFactory); + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + builder.addAll(value.getFieldList()); + builder.add("KEY", key); + RelDataType combinedSchema = builder.build(); + return DataTypeUtils.flatten(combinedSchema, typeFactory); + } + + protected RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { + return AvroConverter.rel(schema, typeFactory); + } +} diff --git a/hoptimator-venice/src/main/resources/META-INF/services/java.sql.Driver b/hoptimator-venice/src/main/resources/META-INF/services/java.sql.Driver new file mode 100644 index 0000000..37636c7 --- /dev/null +++ b/hoptimator-venice/src/main/resources/META-INF/services/java.sql.Driver @@ -0,0 +1 @@ +com.linkedin.hoptimator.venice.VeniceDriver diff --git a/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/TestSqlScripts.java b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/TestSqlScripts.java new file mode 100644 index 0000000..27479ba --- /dev/null +++ b/hoptimator-venice/src/test/java/com/linkedin/hoptimator/venice/TestSqlScripts.java @@ -0,0 +1,28 @@ +package com.linkedin.hoptimator.venice; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.jdbc.QuidemTestBase; + + +public class TestSqlScripts extends QuidemTestBase { + + @Test + @Tag("integration") + public void veniceDdlSelectScript() throws Exception { + run("venice-ddl-select.id"); + } + + @Test + @Tag("integration") + public void veniceDdlInsertAllScript() throws Exception { + run("venice-ddl-insert-all.id"); + } + + @Test + @Tag("integration") + public void veniceDdlInsertPartialScript() throws Exception { + run("venice-ddl-insert-partial.id"); + } +} diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id new file mode 100644 index 0000000..49254fe --- /dev/null +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id @@ -0,0 +1,34 @@ +!set outputformat mysql +!use k8s + +insert into "VENICE-CLUSTER0"."test-store-1" select * from "VENICE-CLUSTER0"."test-store"; +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: venice-cluster0-test-store-1 +spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +!specify diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id new file mode 100644 index 0000000..be1fa60 --- /dev/null +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -0,0 +1,34 @@ +!set outputformat mysql +!use k8s + +insert into "VENICE-CLUSTER0"."test-store-1" ("KEY$id", "intField") select "KEY$id", "stringField" from "VENICE-CLUSTER0"."test-store"; +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: venice-cluster0-test-store-1 +spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS INTEGER) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store` + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +!specify diff --git a/hoptimator-venice/src/test/resources/venice-ddl-select.id b/hoptimator-venice/src/test/resources/venice-ddl-select.id new file mode 100644 index 0000000..6327504 --- /dev/null +++ b/hoptimator-venice/src/test/resources/venice-ddl-select.id @@ -0,0 +1,34 @@ +!set outputformat mysql +!use k8s + +select * from "VENICE-CLUSTER0"."test-store-1"; +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: pipeline-sink +spec: + image: docker.io/library/hoptimator-flink-runner + imagePullPolicy: Never + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 0.1 + taskManager: + resource: + memory: "2048m" + cpu: 0.1 + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE TABLE IF NOT EXISTS `test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH () + - INSERT INTO `SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1` + jarURI: local:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +!specify diff --git a/settings.gradle b/settings.gradle index 94efd91..b55afb1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,9 +17,13 @@ include 'hoptimator-operator' include 'hoptimator-operator-integration' include 'hoptimator-planner' // <-- marked for deletion include 'hoptimator-util' +include 'hoptimator-venice' dependencyResolutionManagement { repositories { mavenCentral() + maven { + url "https://linkedin.jfrog.io/artifactory/open-source" + } } }