diff --git a/Makefile b/Makefile
index 933178a..c37c76f 100644
--- a/Makefile
+++ b/Makefile
@@ -18,40 +18,57 @@ integration-tests:
clean:
./gradlew clean
-deploy-demo:
- kubectl apply -f ./deploy/samples/demodb.yaml
+deploy-config:
+ kubectl create configmap hoptimator-configmap --from-file=model.yaml=test-model.yaml --dry-run=client -o yaml | kubectl apply -f -
deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy
-undeploy:
- kubectl delete -f ./deploy || echo "skipping"
- kubectl delete configmap hoptimator-configmap || echo "skipping"
-
quickstart: build deploy
-deploy-dev-environment:
+deploy-demo: deploy
+ kubectl apply -f ./deploy/samples/demodb.yaml
+
+deploy-samples: deploy
+ kubectl wait --for=condition=Established=True \
+ crds/subscriptions.hoptimator.linkedin.com \
+ crds/kafkatopics.hoptimator.linkedin.com \
+ crds/sqljobs.hoptimator.linkedin.com
+ kubectl apply -f ./deploy/samples
+
+deploy-dev-environment: deploy-config
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
+ kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy/dev
+ kubectl apply -f ./deploy/samples/demodb.yaml
+ kubectl apply -f ./deploy/samples/kafkadb.yaml
-deploy-samples: deploy
- kubectl wait --for=condition=Established=True \
- crds/subscriptions.hoptimator.linkedin.com \
- crds/kafkatopics.hoptimator.linkedin.com \
- crds/sqljobs.hoptimator.linkedin.com
- kubectl apply -f ./deploy/samples
+undeploy-dev-environment:
+ kubectl delete $(shell kubectl get kafkatopic.kafka.strimzi.io -o name -n kafka) -n kafka || echo "skipping"
+ kubectl delete $(shell kubectl get strimzi -o name -n kafka) -n kafka || echo "skipping"
+ kubectl delete pvc -l strimzi.io/name=one-kafka -n kafka || echo "skipping"
+ kubectl delete -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka || echo "skipping"
+ kubectl delete -f ./deploy/samples/kafkadb.yaml || echo "skipping"
+ kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"
+ kubectl delete -f ./deploy/dev || echo "skipping"
+ kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
+ kubectl delete namespace kafka || echo "skipping"
+ helm uninstall flink-kubernetes-operator || echo "skipping"
+ kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
-deploy-config:
- kubectl create configmap hoptimator-configmap --from-file=model.yaml=test-model.yaml --dry-run=client -o yaml | kubectl apply -f -
+undeploy: undeploy-dev-environment
+ kubectl delete -f ./deploy || echo "skipping"
+ kubectl delete configmap hoptimator-configmap || echo "skipping"
generate-models:
- ./models/generate-models.sh
+ ./generate-models.sh
+ ./hoptimator-models/generate-models.sh # <-- marked for deletion
release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
diff --git a/README.md b/README.md
index 12fab71..bb25e84 100644
--- a/README.md
+++ b/README.md
@@ -39,11 +39,24 @@ Materialized views result in `pipelines`:
Hoptimator requires a Kubernetes cluster. To connect from outside a Kubernetes cluster, make sure your `kubectl` is properly configured.
+The below setup will install two local demo DBs, ads and profiles.
+
```
$ make install # build and install SQL CLI
- $ make deploy deploy-demo # install CRDs and K8s objects
- $ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 &
- $ ./hoptimator
+ $ make deploy deploy-demo # install demo DB CRDs and K8s objects
+ $ ./hoptimator # start the SQL CLI
+ > !intro
+```
+
+## Set up Kafka & Flink clusters
+
+The below setup will install a Kafka and Flink cluster within Kubernetes.
+
+```
+ $ make install # build and install SQL CLI
+ $ make deploy-dev-environment # start local Kafka & Flink setups
+ $ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI
+ $ ./hoptimator # start the SQL CLI
> !intro
```
diff --git a/build.gradle b/build.gradle
index be80bcc..033cbd5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,6 +1,11 @@
-
+plugins {
+ id("com.github.spotbugs") version "6.0.26"
+}
subprojects {
+ apply plugin: 'checkstyle'
+ apply plugin: 'com.github.spotbugs'
+
tasks.withType(JavaCompile) {
options.release = 8
options.compilerArgs << '-Xlint:deprecation'
@@ -9,4 +14,12 @@ subprojects {
tasks.withType(Javadoc) {
options.addStringOption('Xdoclint:none', '-quiet')
}
+
+ spotbugs {
+ ignoreFailures = true
+ showProgress = true
+ reportsDir = file("$buildDir/spotbugs")
+ includeFilter = file("$rootDir/config/spotbugs/include.xml")
+ excludeFilter = file("$rootDir/config/spotbugs/exclude.xml")
+ }
}
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
new file mode 100644
index 0000000..32c5a12
--- /dev/null
+++ b/config/checkstyle/checkstyle.xml
@@ -0,0 +1,237 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml
new file mode 100644
index 0000000..1a9612b
--- /dev/null
+++ b/config/checkstyle/suppressions.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
diff --git a/config/spotbugs/exclude.xml b/config/spotbugs/exclude.xml
new file mode 100644
index 0000000..52c8abf
--- /dev/null
+++ b/config/spotbugs/exclude.xml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/config/spotbugs/include.xml b/config/spotbugs/include.xml
new file mode 100644
index 0000000..5f45c43
--- /dev/null
+++ b/config/spotbugs/include.xml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml
index d24fbb0..237b920 100644
--- a/deploy/dev/kafka.yaml
+++ b/deploy/dev/kafka.yaml
@@ -26,6 +26,10 @@ spec:
version: 3.8.0
replicas: 1
listeners:
+ - name: plain
+ port: 9094
+ type: internal
+ tls: false
- name: tls
port: 9093
type: internal
diff --git a/deploy/subscriptions.crd.yaml b/deploy/subscriptions.crd.yaml
new file mode 100644
index 0000000..67943d9
--- /dev/null
+++ b/deploy/subscriptions.crd.yaml
@@ -0,0 +1,106 @@
+apiVersion: apiextensions.k8s.io/v1
+kind: CustomResourceDefinition
+metadata:
+ name: subscriptions.hoptimator.linkedin.com
+spec:
+ group: hoptimator.linkedin.com
+ names:
+ kind: Subscription
+ listKind: SubscriptionList
+ plural: subscriptions
+ singular: subscription
+ shortNames:
+ - sub
+ - subs
+ preserveUnknownFields: false
+ scope: Namespaced
+ versions:
+ - name: v1alpha1
+ served: true
+ storage: true
+ schema:
+ openAPIV3Schema:
+ description: Hoptimator Subscription
+ type: object
+ properties:
+ apiVersion:
+ type: string
+ kind:
+ type: string
+ metadata:
+ type: object
+ spec:
+ description: Subscription spec
+ type: object
+ properties:
+ sql:
+ description: A single SQL query.
+ type: string
+ database:
+ description: The database in which to create the output/sink table.
+ type: string
+ hints:
+ description: Hints to adapters, which may disregard them.
+ type: object
+ additionalProperties:
+ type: string
+ required:
+ - sql
+ - database
+ status:
+ description: Filled in by the operator.
+ type: object
+ properties:
+ ready:
+ description: Whether the subscription is ready to be consumed.
+ type: boolean
+ failed:
+ description: Indicates that the operator was unable to deploy a pipeline for this subscription.
+ type: boolean
+ message:
+ description: Error or success message, for information only.
+ type: string
+ sql:
+ description: The SQL being implemented by this pipeline.
+ type: string
+ hints:
+ description: The hints being used by this pipeline.
+ type: object
+ additionalProperties:
+ type: string
+ attributes:
+ description: Physical attributes of the job and sink/output table.
+ type: object
+ additionalProperties:
+ type: string
+ resources:
+ description: The yaml generated to implement this pipeline.
+ type: array
+ items:
+ type: string
+ jobResources:
+ description: The yaml generated to implement the job.
+ type: array
+ items:
+ type: string
+ downstreamResources:
+ description: The yaml generated to implement the sink/output table.
+ type: array
+ items:
+ type: string
+ subresources:
+ status: {}
+ additionalPrinterColumns:
+ - name: STATUS
+ type: string
+ description: Status message from the operator.
+ jsonPath: .status.message
+ - name: DB
+ type: string
+ description: The database where the subscription is materialized.
+ jsonPath: .spec.database
+ - name: SQL
+ type: string
+ description: The SQL query that the subscription materializes.
+ jsonPath: .spec.sql
+
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Catalog.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Catalog.java
index 9523201..0119525 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Catalog.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Catalog.java
@@ -1,12 +1,15 @@
package com.linkedin.hoptimator;
-import java.sql.Wrapper;
import java.sql.SQLException;
+import java.sql.Wrapper;
+
/** Registers a set of tables, possibly within schemas and sub-schemas. */
public interface Catalog {
String name();
+
String description();
+
void register(Wrapper parentSchema) throws SQLException;
}
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/CatalogProvider.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/CatalogProvider.java
index 61a9f2a..e5fb87f 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/CatalogProvider.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/CatalogProvider.java
@@ -2,6 +2,7 @@
import java.util.Collection;
+
public interface CatalogProvider {
Collection catalogs();
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Connector.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Connector.java
index ff84fe7..25b6678 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Connector.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Connector.java
@@ -1,9 +1,10 @@
package com.linkedin.hoptimator;
-import java.util.Map;
import java.sql.SQLException;
+import java.util.Map;
+
public interface Connector {
-
+
Map configure(T t) throws SQLException;
}
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorProvider.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorProvider.java
index 3fca89a..8ddb32c 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorProvider.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ConnectorProvider.java
@@ -2,6 +2,7 @@
import java.util.Collection;
+
public interface ConnectorProvider {
Collection> connectors(Class clazz);
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Database.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Database.java
index fd86441..f9b1317 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Database.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Database.java
@@ -1,6 +1,6 @@
package com.linkedin.hoptimator;
-/** A collection of tables, as populated by a Catalog. */
+/** A collection of tables, as populated by a Catalog. */
public interface Database {
/** Name of the database. */
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployable.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployable.java
index 570f421..5be7423 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployable.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployable.java
@@ -1,12 +1,15 @@
package com.linkedin.hoptimator;
-import java.util.List;
import java.sql.SQLException;
+import java.util.List;
+
public interface Deployable {
void create() throws SQLException;
+
void delete() throws SQLException;
+
void update() throws SQLException;
/** Render a list of specs, usually YAML. */
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployer.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployer.java
index 8354664..abbe0bf 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployer.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Deployer.java
@@ -1,12 +1,16 @@
package com.linkedin.hoptimator;
-import java.util.List;
import java.sql.SQLException;
+import java.util.List;
+
public interface Deployer {
-
+
void create(T t) throws SQLException;
+
void update(T t) throws SQLException;
+
void delete(T t) throws SQLException;
+
List specify(T t) throws SQLException;
}
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/DeployerProvider.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/DeployerProvider.java
index 8367f1c..341d21a 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/DeployerProvider.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/DeployerProvider.java
@@ -2,6 +2,7 @@
import java.util.Collection;
+
public interface DeployerProvider {
Collection> deployers(Class clazz);
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java
index 35118e4..204e8bf 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Validator.java
@@ -3,8 +3,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -12,7 +12,7 @@
public interface Validator {
-
+
void validate(T t, Issues issues);
static void validateSubdomainName(String s, Issues issues) {
@@ -142,7 +142,7 @@ private String fullPath() {
Collections.reverse(parts);
return String.join("/", parts);
}
-
+
private String format(int indentLevel) {
if (empty()) {
return "";
diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java
index 8d3935b..95dee2f 100644
--- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java
+++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/ValidatorProvider.java
@@ -2,6 +2,7 @@
import java.util.Collection;
+
public interface ValidatorProvider {
Collection> validators(Class clazz);
diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java
index 3370b5a..520b343 100644
--- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java
+++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java
@@ -1,19 +1,19 @@
package com.linkedin.hoptimator.avro;
-import org.apache.avro.Schema;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
-import java.util.AbstractMap;
-import java.util.List;
-import java.util.stream.Collectors;
/** Converts between Avro and Calcite's RelDataType */
public final class AvroConverter {
@@ -23,41 +23,43 @@ private AvroConverter() {
public static Schema avro(String namespace, String name, RelDataType dataType) {
if (dataType.isStruct()) {
- List fields = dataType.getFieldList().stream()
- .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
- .collect(Collectors.toList());
- return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields),
- dataType.isNullable());
+ List fields = dataType.getFieldList()
+ .stream()
+ .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x),
+ null))
+ .collect(Collectors.toList());
+ return createAvroSchemaWithNullability(
+ Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), dataType.isNullable());
} else {
switch (dataType.getSqlTypeName()) {
- case INTEGER:
- return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
- case SMALLINT:
- return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
- case BIGINT:
- return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable());
- case VARCHAR:
- return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
- case FLOAT:
- return createAvroTypeWithNullability(Schema.Type.FLOAT, dataType.isNullable());
- case DOUBLE:
- return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable());
- case CHAR:
- return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
- case BOOLEAN:
- return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
- case ARRAY:
- return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
- dataType.isNullable());
- // TODO support map types
- // Appears to require a Calcite version bump
- // case MAP:
- // return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
- case UNKNOWN:
- case NULL:
- return Schema.createUnion(Schema.create(Schema.Type.NULL));
- default:
- throw new UnsupportedOperationException("No support yet for " + dataType.getSqlTypeName().toString());
+ case INTEGER:
+ return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
+ case SMALLINT:
+ return createAvroTypeWithNullability(Schema.Type.INT, dataType.isNullable());
+ case BIGINT:
+ return createAvroTypeWithNullability(Schema.Type.LONG, dataType.isNullable());
+ case VARCHAR:
+ return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
+ case FLOAT:
+ return createAvroTypeWithNullability(Schema.Type.FLOAT, dataType.isNullable());
+ case DOUBLE:
+ return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable());
+ case CHAR:
+ return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable());
+ case BOOLEAN:
+ return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
+ case ARRAY:
+ return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())),
+ dataType.isNullable());
+ // TODO support map types
+ // Appears to require a Calcite version bump
+ // case MAP:
+ // return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable());
+ case UNKNOWN:
+ case NULL:
+ return Schema.createUnion(Schema.create(Schema.Type.NULL));
+ default:
+ throw new UnsupportedOperationException("No support yet for " + dataType.getSqlTypeName().toString());
}
}
}
@@ -82,42 +84,43 @@ private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean
public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) {
RelDataType unknown = typeFactory.createUnknownType();
switch (schema.getType()) {
- case RECORD:
- return typeFactory.createStructType(schema.getFields().stream()
- .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory)))
- .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
- .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
- .collect(Collectors.toList()));
- case INT:
- return createRelType(typeFactory, SqlTypeName.INTEGER);
- case LONG:
- return createRelType(typeFactory, SqlTypeName.BIGINT);
- case ENUM:
- case FIXED:
- case STRING:
- return createRelType(typeFactory, SqlTypeName.VARCHAR);
- case FLOAT:
- return createRelType(typeFactory, SqlTypeName.FLOAT);
- case DOUBLE:
- return createRelType(typeFactory, SqlTypeName.DOUBLE);
- case BOOLEAN:
- return createRelType(typeFactory, SqlTypeName.BOOLEAN);
- case ARRAY:
- return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
+ case RECORD:
+ return typeFactory.createStructType(schema.getFields()
+ .stream()
+ .map(x -> new AbstractMap.SimpleEntry<>(x.name(), rel(x.schema(), typeFactory)))
+ .filter(x -> x.getValue().getSqlTypeName() != SqlTypeName.NULL)
+ .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName())
+ .collect(Collectors.toList()));
+ case INT:
+ return createRelType(typeFactory, SqlTypeName.INTEGER);
+ case LONG:
+ return createRelType(typeFactory, SqlTypeName.BIGINT);
+ case ENUM:
+ case FIXED:
+ case STRING:
+ return createRelType(typeFactory, SqlTypeName.VARCHAR);
+ case FLOAT:
+ return createRelType(typeFactory, SqlTypeName.FLOAT);
+ case DOUBLE:
+ return createRelType(typeFactory, SqlTypeName.DOUBLE);
+ case BOOLEAN:
+ return createRelType(typeFactory, SqlTypeName.BOOLEAN);
+ case ARRAY:
+ return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1);
// TODO support map types
// Appears to require a Calcite version bump
// case MAP:
// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory));
- case UNION:
- if (schema.isNullable() && schema.getTypes().size() == 2) {
- Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
- return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true);
- } else {
- // TODO support more elaborate union types
- return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
- }
- default:
- return typeFactory.createUnknownType();
+ case UNION:
+ if (schema.isNullable() && schema.getTypes().size() == 2) {
+ Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get();
+ return typeFactory.createTypeWithNullability(rel(innerType, typeFactory), true);
+ } else {
+ // TODO support more elaborate union types
+ return typeFactory.createTypeWithNullability(typeFactory.createUnknownType(), true);
+ }
+ default:
+ return typeFactory.createUnknownType();
}
}
diff --git a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java
index 6064314..5472246 100644
--- a/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java
+++ b/hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroTableValidator.java
@@ -1,6 +1,8 @@
package com.linkedin.hoptimator.avro;
-import com.linkedin.hoptimator.Validator;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
@@ -11,16 +13,15 @@
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+
+import com.linkedin.hoptimator.Validator;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
/** Validates that tables follow Avro schema evolution rules. */
class AvroTableValidator implements Validator {
-
+
@Override
public void validate(SchemaPlus schema, Issues issues) {
try {
@@ -55,7 +56,7 @@ private void validate(SchemaPlus schema, Table table, Table originalTable, Issue
DataFileWriter