Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Venice support #78

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.gradle
.idea/
/build
*/build/
*/*.iml
./models/external/
Expand Down
68 changes: 50 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,63 @@ build:

bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

# Integration tests expect K8s and Kafka to be running
integration-tests: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`

clean:
./gradlew clean

deploy-config:
kubectl create configmap hoptimator-configmap --from-file=model.yaml=test-model.yaml --dry-run=client -o yaml | kubectl apply -f -

undeploy-config:
kubectl delete configmap hoptimator-configmap || echo "skipping"

deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy

undeploy: undeploy-config
kubectl delete -f ./deploy || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"

quickstart: build deploy

deploy-demo: deploy
kubectl apply -f ./deploy/samples/demodb.yaml

undeploy-demo: undeploy
kubectl delete -f ./deploy/samples/demodb.yaml

deploy-samples: deploy
kubectl wait --for=condition=Established=True \
crds/subscriptions.hoptimator.linkedin.com \
crds/kafkatopics.hoptimator.linkedin.com \
crds/sqljobs.hoptimator.linkedin.com
kubectl apply -f ./deploy/samples

deploy-dev-environment: deploy-config
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
undeploy-samples: undeploy
kubectl delete -f ./deploy/samples || echo "skipping"

deploy-flink:
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

undeploy-flink:
kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping"
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
kubectl delete crd flinksessionjobs.flink.apache.org || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"

deploy-kafka: deploy deploy-flink
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml

undeploy-dev-environment:
undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
kubectl delete strimzi -n kafka --all || echo "skipping"
kubectl delete pvc -l strimzi.io/name=one-kafka -n kafka || echo "skipping"
Expand All @@ -65,12 +78,31 @@ undeploy-dev-environment:
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
kubectl delete namespace kafka || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

undeploy: undeploy-dev-environment
kubectl delete -f ./deploy || echo "skipping"
kubectl delete configmap hoptimator-configmap || echo "skipping"
# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
deploy-venice: deploy deploy-flink
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml up -d --wait
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchema.avsc schemas/valueSchema.avsc
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-1 schemas/keySchema.avsc schemas/valueSchema.avsc
kubectl apply -f ./deploy/samples/venicedb.yaml

undeploy-venice:
kubectl delete -f ./deploy/samples/venicedb.yaml || echo "skipping"
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml down

deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice

undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy

# Integration tests expect K8s, Kafka, and Venice to be running
integration-tests: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`

generate-models:
./generate-models.sh
Expand All @@ -80,4 +112,4 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

.PHONY: build test install clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests deploy-dev-environment undeploy-dev-environment generate-models release
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The below setup will install a Kafka and Flink cluster within Kubernetes.

```
$ make install # build and install SQL CLI
$ make deploy-dev-environment # start local Kafka & Flink setups
$ make deploy-dev-environment # start all local dev setups
$ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI
$ ./hoptimator # start the SQL CLI
> !intro
Expand Down
84 changes: 84 additions & 0 deletions deploy/docker/docker-compose-single-dc-setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
services:
zookeeper:
image: venicedb/apache-zookeeper:3.9.0
container_name: zookeeper
hostname: zookeeper
healthcheck:
test: ["CMD-SHELL", "echo ruok | nc zookeeper 2181"]
start_period: 10s
interval: 5s
timeout: 5s
retries: 5

kafka:
image: venicedb/apache-kafka:3.3.1
container_name: kafka
hostname: kafka
environment:
- ZOOKEEPER_ADDRESS=zookeeper:2181
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "bash -x bin/kafka-topics.sh --bootstrap-server localhost:9092 --list"]
start_period: 60s
interval: 5s
timeout: 20s
retries: 5

venice-controller:
image: venicedb/venice-controller:0.4.340
container_name: venice-controller
hostname: venice-controller
depends_on:
kafka:
condition: service_healthy
ports:
- 5555:5555
healthcheck:
test: ["CMD-SHELL", "sleep 5"]
start_period: 20s
interval: 5s
timeout: 20s
retries: 5

venice-server:
image: venicedb/venice-server:0.4.340
container_name: venice-server
hostname: venice-server
depends_on:
venice-controller:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "sleep 5"]
start_period: 20s
interval: 5s
timeout: 20s
retries: 5

venice-router:
image: venicedb/venice-router:0.4.340
container_name: venice-router
hostname: venice-router
depends_on:
venice-server:
condition: service_healthy
ports:
- 7777:7777
healthcheck:
test: ["CMD-SHELL", "sleep 5"]
start_period: 20s
interval: 5s
timeout: 20s
retries: 5

venice-client:
image: venicedb/venice-client:0.4.340
container_name: venice-client
hostname: venice-client
tty: true
volumes:
- ./venice:/opt/venice/schemas
depends_on:
venice-router:
condition: service_healthy
11 changes: 11 additions & 0 deletions deploy/docker/venice/keySchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "record",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to get create table venice.foo working :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I will spend some time looking into this when I can. Should be a simple API call just as I'm doing to fetch schemas, just different than the current paradigm since it isn't managed via K8s.

"name": "SampleTableKey",
"doc": "SampleTableKey",
"fields": [
{
"name": "id",
"type": "int"
}
]
}
23 changes: 23 additions & 0 deletions deploy/docker/venice/valueSchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"type": "record",
"name": "SampleTableValue",
"doc": "SampleTableValue",
"fields": [
{
"name": "intField",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "stringField",
"type": [
"null",
"string"
],
"default": null
}
]
}
25 changes: 25 additions & 0 deletions deploy/samples/venicedb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: venice-cluster0
spec:
schema: VENICE-CLUSTER0
url: jdbc:venice://cluster=venice-cluster0;router.url=http://localhost:7777
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: venice-template-cluster0
spec:
databases:
- venice-cluster0
connector: |
connector = venice
storeName = {{table}}
partial-update-mode = true
key.fields-prefix = KEY_
key.fields = {{keys}}
value.fields-include: EXCEPT_KEY
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ slf4j-api = "org.slf4j:slf4j-api:1.7.30"
sqlline = "sqlline:sqlline:1.12.0"
commons-cli = "commons-cli:commons-cli:1.4"
quidem = "net.hydromatic:quidem:0.11"
venice = "com.linkedin.venice:venice-common:0.4.376"
venice-client = "com.linkedin.venice:venice-thin-client:0.4.376"
4 changes: 2 additions & 2 deletions hoptimator-avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ publishing {
license {
name = 'BSD 2-Clause'
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
}
}
}
scm {
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
Expand All @@ -52,4 +52,4 @@ publishing {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Represents something required by a Table.
Expand All @@ -37,6 +40,7 @@
* for informational/debugging purposes.
*/
public abstract class Resource {
private static final Logger log = LoggerFactory.getLogger(Resource.class);
private final String template;
private final SortedMap<String, Supplier<String>> properties = new TreeMap<>();
private final List<Resource> inputs = new ArrayList<>();
Expand Down Expand Up @@ -345,6 +349,9 @@ private static String applyTransform(String value, String transform) {
case "concat":
res = res.replace("\n", "");
break;
default:
log.warn("Transformation function '{}' not found", f);
break;
}
}
return res;
Expand Down
1 change: 1 addition & 0 deletions hoptimator-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation project(':hoptimator-demodb')
implementation project(':hoptimator-jdbc')
implementation project(':hoptimator-kafka')
implementation project(':hoptimator-venice')
implementation project(':hoptimator-k8s')
implementation project(':hoptimator-util')
implementation libs.calcite.core
Expand Down
10 changes: 5 additions & 5 deletions hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Scanner;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.jline.reader.Completer;

import com.linkedin.hoptimator.SqlDialect;
Expand Down Expand Up @@ -87,8 +87,8 @@ public void execute(String line, DispatchCallback dispatchCallback) {
String sql = split[1];
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
try {
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
PipelineRel.Implementor plan = DeploymentService.plan(root);
sqlline.output(plan.sql().apply(SqlDialect.ANSI));
} catch (SQLException e) {
sqlline.error(e);
Expand Down Expand Up @@ -155,9 +155,9 @@ public void execute(String line, DispatchCallback dispatchCallback) {
}
String sql = split[1];
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
try {
List<String> specs = DeploymentService.plan(rel).pipeline().specify();
List<String> specs = DeploymentService.plan(root).pipeline().specify();
specs.forEach(x -> sqlline.output(x + "\n\n---\n\n"));
} catch (SQLException e) {
sqlline.error(e);
Expand Down
2 changes: 1 addition & 1 deletion hoptimator-cli/src/main/resources/intro.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Try:
> !tables
> !schemas
> create view foo as select * from ads.ad_clicks natural join profile.members;
> !yaml select * from foo
> !specify select * from foo
> !pipeline select * from foo


Loading
Loading