Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Venice support #78

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Add Venice support #78

wants to merge 5 commits into from

Conversation

jogrogan
Copy link
Collaborator

@jogrogan jogrogan commented Dec 17, 2024

Adds Venice support to Hoptimator.

  • Queries Venice for Key & Value Avro schemas, merges them into one schema, keys structured as KEY$ to prevent collisions
  • Passes KEY fields through to Sink options (intended to be used by flink under key.fields connector property)
  • Supports partial inserts of the form insert into "VENICE-CLUSTER0"."test-store-1" ("KEY$id", "stringField") SELECT ...
    • Fields not specified in the insert are required to be nullable (this may still be a problem for fields that are not nullable but do have a default in Avro)
    • Captures the targetFields specified in the insert, and drops other fields.
    • Rewrites INSERT with aliasing for fields specified (see examples in comments around ScriptImplementor)

Other changes in this PR:

  • Adds Venice tests
  • Adds ability to stand up Venice locally in Docker
  • Clean up Makefile targets
  • Address various checkstyle errors

Implemented the Venice driver/schema classes with separate overridable functions to be able to handle company-internal connection components via a simple override

./hoptimator
0: Hoptimator> create or replace materialized view "VENICE-CLUSTER0"."my-store" as select * from "VENICE-CLUSTER0"."test-store";

0: Hoptimator> !tables
+-----------+-----------------+------------------+-------------------+---------+
| TABLE_CAT |   TABLE_SCHEM   |    TABLE_NAME    |    TABLE_TYPE     | REMARKS |
+-----------+-----------------+------------------+-------------------+---------+
...
|           | VENICE-CLUSTER0 | my-store         | MATERIALIZED VIEW |         |
|           | VENICE-CLUSTER0 | test-store       | TABLE             |         |
|           | VENICE-CLUSTER0 | test-store-1     | TABLE             |         |
...
+-----------+-----------------+------------------+-------------------+---------+
$ k get flinkdeployments.flink.apache.org -o yaml venice-cluster0-my-store
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  creationTimestamp: "2024-12-24T20:38:41Z"
  finalizers:
  - flinkdeployments.flink.apache.org/finalizer
  generation: 2
  name: venice-cluster0-my-store
  namespace: default
  resourceVersion: "32186"
  uid: e0d855d3-af15-49d9-a16b-13c87579a23e
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  flinkVersion: v1_16
  image: docker.io/library/hoptimator-flink-runner
  imagePullPolicy: Never
  job:
    args:
    - CREATE TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR,
      `KEY` ROW(`id` INTEGER)) WITH ('connector'='venice', 'key.fields'='KEY_id',
      'key.fields-prefix'='KEY_', 'partial-update-mode'='true', 'storeName'='test-store',
      'value.fields-include'='EXCEPT_KEY');
    - CREATE TABLE IF NOT EXISTS `my-store` (`intField` INTEGER, `stringField` VARCHAR,
      `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_',
      'partial-update-mode'='true', 'storeName'='my-store', 'value.fields-include'='EXCEPT_KEY');
    - INSERT INTO `my-store` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`;
    entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
    jarURI: local:///opt/hoptimator-flink-runner.jar
    parallelism: 1
    state: running
    upgradeMode: stateless
  jobManager:
    replicas: 1
    resource:
      cpu: 0.1
      memory: 2048m
  serviceAccount: flink
  taskManager:
    resource:
      cpu: 0.1
      memory: 2048m
status:
  clusterInfo: {}
  jobManagerDeploymentStatus: DEPLOYING
  jobStatus:
    checkpointInfo:
      lastPeriodicCheckpointTimestamp: 0
    jobId: 49de826b428ba2d93e4fac8b120de2c4
    savepointInfo:
      lastPeriodicSavepointTimestamp: 0
      savepointHistory: []
    state: RECONCILING
  lifecycleState: DEPLOYED
  observedGeneration: 2
  reconciliationStatus:
    lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/hoptimator-flink-runner.jar","parallelism":1,"entryClass":"com.linkedin.hoptimator.flink.runner.FlinkRunner","args":["CREATE
      TABLE IF NOT EXISTS `test-store` (`intField` INTEGER, `stringField` VARCHAR,
      `KEY` ROW(`id` INTEGER)) WITH (''connector''=''venice'', ''key.fields''=''KEY_id'',
      ''key.fields-prefix''=''KEY_'', ''partial-update-mode''=''true'', ''storeName''=''test-store'',
      ''value.fields-include''=''EXCEPT_KEY'');","CREATE TABLE IF NOT EXISTS `my-store`
      (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH (''connector''=''venice'',
      ''key.fields''=''KEY_id'', ''key.fields-prefix''=''KEY_'', ''partial-update-mode''=''true'',
      ''storeName''=''my-store'', ''value.fields-include''=''EXCEPT_KEY'');","INSERT
      INTO `my-store` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`;"],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"1"},"image":"docker.io/library/hoptimator-flink-runner","imagePullPolicy":"Never","serviceAccount":"flink","flinkVersion":"v1_16","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":0.1,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":0.1,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
    reconciliationTimestamp: 1735072722874
    state: DEPLOYED
  taskManager:
    labelSelector: component=taskmanager,app=venice-cluster0-my-store
    replicas: 1

See included tests for more samples

this.sinkOptions = addKeysAsOption(options, rowType);
}

private Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
Copy link
Collaborator Author

@jogrogan jogrogan Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this approach, open to suggestions.

I looked into hints to solve this and did get them working to an extent (will open a separate PR) but this would require users to pass in key information into their SQL statement. I have not figured out a way to inject hints at runtime from VeniceDriver.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm surprised we need to fully specify the keys in the options. The Kafka connector has similar properties (key.prefix, key.fields), but you don't need both. Is the Venice connector doing something different here? I'd expect key.prefix=key_ to be sufficient.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how would the Venice connector behave if we grouped the keys in a Row(...) object? Can we just have key.fields=KEY and then KEY ROW(F1 VARCHAR, F2 INT) etc?

(Not suggesting we do that, just asking if possible?)

Copy link
Collaborator Author

@jogrogan jogrogan Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm surprised we need to fully specify the keys in the options. The Kafka connector has similar properties (key.prefix, key.fields), but you don't need both. Is the Venice connector doing something different here? I'd expect key.prefix=key_ to be sufficient.

Yea I confirmed it is an issue with Venice due do some additional avro schema validation they do. They pull the keySchema and validate it against key.fields (separate from the prefix). The key.prefix allow these names to be different like "id" vs "key_id"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into the ROW syntax and it doesn't seem that is possible in Flink, there is no way to get Flink to destruct that ROW

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I think that's fine. The only slight concern I have is potential deviation from the Kafka -> Kafka use-case. With Kafka -> Kafka users can do select * and retain partitioning, since the output topic will use the input topic's KEY field. Users might expect select * to work similarly with Kafka -> Venice, except Kafka has one key (KEY) and Venice has one or more keys (KEY_xyz), so that might not work. I'm thinking we should actually change the way Kafka works and adopt this KEY_xzy approach, or maybe have PipelineRel explicitly use KEY in cases where there is only one key.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does retaining partitioning mean for the Kafka -> Venice use case? It seems like it is more of a problem on the producer side. Users that expect the same partitioning behavior would have to key their Kafka topic using the same combination of keys as Venice. We did this in Brooklin by constructing the producer key as a simple string with key values separated by _ from the source keys. Of course this isn't the same as identity partitioning but it does ensure that downstream consumer tasks read the same combo of keys.

Even for the Kafka -> Kafka use case, we aren't the ones consuming, the partitioning behavior comes from Flink right? I haven't looked into it to be fair, not sure how the behavior changes if you define key.fields there.

Makefile Outdated
@@ -9,8 +9,8 @@ build:

bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

# Integration tests expect K8s and Kafka to be running
integration-tests: deploy-dev-environment deploy-samples
# Integration tests expect K8s, Kafka, and Venice to be running
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔥 🔥 🔥

hoptimator-kafka/src/test/resources/kafka-ddl.id Outdated Show resolved Hide resolved
this.sinkOptions = addKeysAsOption(options, rowType);
}

private Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm surprised we need to fully specify the keys in the options. The Kafka connector has similar properties (key.prefix, key.fields), but you don't need both. Is the Venice connector doing something different here? I'd expect key.prefix=key_ to be sufficient.

this.sinkOptions = addKeysAsOption(options, rowType);
}

private Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how would the Venice connector behave if we grouped the keys in a Row(...) object? Can we just have key.fields=KEY and then KEY ROW(F1 VARCHAR, F2 INT) etc?

(Not suggesting we do that, just asking if possible?)

@@ -0,0 +1,11 @@
{
"type": "record",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to get create table venice.foo working :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I will spend some time looking into this when I can. Should be a simple API call just as I'm doing to fetch schemas, just different than the current paradigm since it isn't managed via K8s.

// Without forced projection this will get optimized to:
// INSERT INTO `my-store` (`KEYFIELD`, `VARCHARFIELD`) SELECT * FROM `KAFKA`.`existing-topic-1`;
// With forced project this will resolve as:
// INSERT INTO `my-store` (`KEY_id`, `stringField`) SELECT `KEYFIELD` AS `KEY_id`, \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!

Copy link
Collaborator

@ryannedolan ryannedolan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if (schema.startsWith("VENICE")...) logic needs to be fixed, but I think we can accept the TODO and fix later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants