Skip to content

Commit

Permalink
Add support for transformations in resource templates
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jan 31, 2024
1 parent 9496cba commit bef0183
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ deploy-dev-environment:
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl create namespace mysql || echo "skipping"
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.1/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
Expand Down
2 changes: 1 addition & 1 deletion deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ metadata:
namespace: kafka
spec:
kafka:
version: 3.4.0
version: 3.6.1
replicas: 1
listeners:
- name: plain
Expand Down
17 changes: 9 additions & 8 deletions etc/integration-tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@
SELECT * FROM DATAGEN.PERSON;
SELECT * FROM DATAGEN.COMPANY;

-- test mermaid and yaml commands
!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON

-- test insert into command
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

-- MySQL CDC tables
SELECT * FROM INVENTORY."products_on_hand" LIMIT 1;

-- Test check command
!check not empty SELECT * FROM INVENTORY."products_on_hand";

-- MySQL CDC -> Kafka
-- MySQL CDC -> Kafka (via sample subscription "products")
SELECT * FROM RAWKAFKA."products" LIMIT 1;

-- test insert into command
!insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
SELECT * FROM RAWKAFKA."test-sink" LIMIT 5;

-- test mermaid and yaml commands
!mermaid insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
!yaml insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.hoptimator.catalog;

import java.util.Locale;

public final class Names {

private Names() {
}

/** Attempt to format s as a K8s object name, or part of one. */
public static String canonicalize(String s) {
return s.toLowerCase(Locale.ROOT)
.replaceAll("[^a-z0-9\\-]+", "-")
.replaceAll("^[^a-z0-9]*", "")
.replaceAll("[^a-z0-9]*$", "")
.replaceAll("\\-+", "-");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import java.io.InputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -255,7 +257,21 @@ public interface Template {
* Replaces `{{var}}` in a template file with the corresponding variable.
*
* Resource-scoped variables take precedence over Environment-scoped
* variables. Default values can supplied with `{{var:default}}`.
* variables.
*
* Default values can supplied with `{{var:default}}`.
*
* Built-in transformations can be applied to variables, including:
*
* - `{{var toName}}`, `{{var:default toName}}`: canonicalize the
* variable as a valid K8s object name.
* - `{{var toUpperCase}}`, `{{var:default toUpperCase}}`: render in
* all upper case.
* - `{{var toLowerCase}}`, `{{var:default toLowerCase}}`: render in
* all lower case.
* - `{{var concat}}`, `{{var:default concat}}`: concatinate a multiline
* string into one line
* - `{{var concat toUpperCase}}`: apply both transformations in sequence.
*
* If `var` contains multiple lines, the behavior depends on context;
* specifically, whether the pattern appears within a list or comment
Expand Down Expand Up @@ -288,7 +304,8 @@ public SimpleTemplate(Environment env, String template) {
@Override
public String render(Resource resource) {
StringBuffer sb = new StringBuffer();
Pattern p = Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*\\}\\}");
Pattern p = Pattern.compile(
"([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*((\\w+\\W*)*)\\s*\\}\\}");
Matcher m = p.matcher(template);
while (m.find()) {
String prefix = m.group(1);
Expand All @@ -297,18 +314,47 @@ public String render(Resource resource) {
}
String key = m.group(2);
String defaultValue = m.group(4);
String transform = m.group(5);
String value = resource.getOrDefault(key, () -> env.getOrDefault(key, () -> defaultValue));
if (value == null) {
throw new IllegalArgumentException(template + " has no value for key " + key + ".");
}
String transformedValue = applyTransform(value, transform);
String quotedPrefix = Matcher.quoteReplacement(prefix);
String quotedValue = Matcher.quoteReplacement(value);
String quotedValue = Matcher.quoteReplacement(transformedValue);
String replacement = quotedPrefix + quotedValue.replaceAll("\\n", quotedPrefix);
m.appendReplacement(sb, replacement);
}
m.appendTail(sb);
return sb.toString();
}

private static String applyTransform(String value, String transform) {
String res = value;
String[] funcs = transform.split("\\W+");
for (String f : funcs) {
switch (f) {
case "toLowerCase":
res = res.toLowerCase(Locale.ROOT);
break;
case "toUpperCase":
res = res.toUpperCase(Locale.ROOT);
break;
case "toName":
res = canonicalizeName(res);
break;
case "concat":
res = res.replace("\n", "");
break;
}
}
return res;
}

/** Attempt to format s as a K8s object name, or part of one. */
protected static String canonicalizeName(String s) {
return Names.canonicalize(s);
}
}

/** Locates a Template for a given Resource */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.junit.Assert.assertEquals;
import org.junit.Test;

import java.util.function.Function;

public class ResourceTest {

@Test
Expand All @@ -25,4 +27,25 @@ public void handlesChainedEnvironments() {
assertEquals("bar", env.getOrDefault("foo", () -> "x"));
assertEquals("x", env.getOrDefault("oof", () -> "x"));
}

@Test
public void rendersTemplates() {
Resource.Environment env = new Resource.SimpleEnvironment() {{
export("one", "1");
export("foo", "bar");
}};
Resource res = new Resource("x") {{
export("car", "Hyundai Accent");
export("parts", "wheels\nseats\nbrakes\nwipers");
}};

Function<String, Resource.Template> f = x -> new Resource.SimpleTemplate(env, x);
assertEquals("xyz", f.apply("xyz").render(res));
assertEquals("bar", f.apply("{{foo}}").render(res));
assertEquals("bar", f.apply("{{ foo }}").render(res));
assertEquals("abc", f.apply("{{xyz:abc}}").render(res));
assertEquals("hyundai-accent", f.apply("{{car toName}}").render(res));
assertEquals("HYUNDAI-ACCENT", f.apply("{{car toName toUpperCase}}").render(res));
assertEquals("WHEELSSEATSBRAKESWIPERS", f.apply("{{parts concat toUpperCase}}").render(res));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import com.linkedin.hoptimator.catalog.Resource;

import java.util.Locale;
import java.util.Map;

class KafkaTopic extends Resource {
KafkaTopic(String topicName, Map<String, String> clientOverrides) {
super("KafkaTopic");
export("topicName", topicName);
export("topicNameLowerCase", topicName.toLowerCase(Locale.ROOT));
export("clientOverrides", clientOverrides);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class KafkaTopicAcl extends Resource {
public KafkaTopicAcl(String topicName, String principal, String method) {
super("KafkaTopicAcl");
export("topicName", topicName);
export("topicNameLowerCase", topicName.toLowerCase(Locale.ROOT));
export("principal", principal);
export("method", method);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: KafkaTopic
metadata:
name: {{topicNameLowerCase}}
name: {{topicName toName}}
namespace: {{pipeline.namespace}}
spec:
topicName: {{topicName}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Acl
metadata:
name: {{topicNameLowerCase}}-acl-{{id}}
name: {{topicName toName}}-acl-{{id}}
namespace: {{pipeline.namespace}}
spec:
resource:
kind: KafkaTopic
name: {{topicNameLowerCase}}
name: {{topicName toName}}
method: {{method}}
principal: {{principal}}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public Result reconcile(Request request) {

String kind = object.getKind();

object.getMetadata().setNamespace(namespace);

V1alpha1SubscriptionStatus status = object.getStatus();
if (status == null) {
status = new V1alpha1SubscriptionStatus();
Expand Down Expand Up @@ -198,6 +200,10 @@ private boolean apply(String yaml, V1alpha1Subscription owner) {

DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
String namespace = obj.getMetadata().getNamespace();
if (namespace == null) {
namespace = owner.getMetadata().getNamespace();
obj.getMetadata().setNamespace(namespace);
}
String name = obj.getMetadata().getName();
KubernetesApiResponse<DynamicKubernetesObject> existing = operator.apiFor(obj).get(namespace, name);
if (existing.isSuccess()) {
Expand Down

0 comments on commit bef0183

Please sign in to comment.