Skip to content

Commit

Permalink
Re-plan pipelines when hints change
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Feb 5, 2024
1 parent 1c992be commit d9a5c17
Show file tree
Hide file tree
Showing 17 changed files with 74 additions and 18 deletions.
5 changes: 5 additions & 0 deletions deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ spec:
sql:
description: The SQL being implemented by this pipeline.
type: string
hints:
description: The hints being used by this pipeline.
type: object
additionalProperties:
type: string
resources:
description: The YAML generated to implement this pipeline.
type: array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Subscription spec
*/
@ApiModel(description = "Subscription spec")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1SubscriptionSpec {
public static final String SERIALIZED_NAME_DATABASE = "database";
@SerializedName(SERIALIZED_NAME_DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@
import io.swagger.annotations.ApiModelProperty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Filled in by the operator.
*/
@ApiModel(description = "Filled in by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
public class V1alpha1SubscriptionStatus {
public static final String SERIALIZED_NAME_FAILED = "failed";
@SerializedName(SERIALIZED_NAME_FAILED)
private Boolean failed;

public static final String SERIALIZED_NAME_HINTS = "hints";
@SerializedName(SERIALIZED_NAME_HINTS)
private Map<String, String> hints = null;

public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
private String message;
Expand Down Expand Up @@ -76,6 +82,37 @@ public void setFailed(Boolean failed) {
}


public V1alpha1SubscriptionStatus hints(Map<String, String> hints) {

this.hints = hints;
return this;
}

public V1alpha1SubscriptionStatus putHintsItem(String key, String hintsItem) {
if (this.hints == null) {
this.hints = new HashMap<>();
}
this.hints.put(key, hintsItem);
return this;
}

/**
* The hints being used by this pipeline.
* @return hints
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The hints being used by this pipeline.")

public Map<String, String> getHints() {
return hints;
}


public void setHints(Map<String, String> hints) {
this.hints = hints;
}


public V1alpha1SubscriptionStatus message(String message) {

this.message = message;
Expand Down Expand Up @@ -186,6 +223,7 @@ public boolean equals(Object o) {
}
V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
return Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
Objects.equals(this.hints, v1alpha1SubscriptionStatus.hints) &&
Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
Expand All @@ -194,7 +232,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(failed, message, ready, resources, sql);
return Objects.hash(failed, hints, message, ready, resources, sql);
}


Expand All @@ -203,6 +241,7 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1SubscriptionStatus {\n");
sb.append(" failed: ").append(toIndentedString(failed)).append("\n");
sb.append(" hints: ").append(toIndentedString(hints)).append("\n");
sb.append(" message: ").append(toIndentedString(message)).append("\n");
sb.append(" ready: ").append(toIndentedString(ready)).append("\n");
sb.append(" resources: ").append(toIndentedString(resources)).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -79,13 +80,17 @@ public Result reconcile(Request request) {
object.setStatus(status);
}

if (object.getSpec().getHints() == null) {
object.getSpec().setHints(new HashMap<>());
}

// We deploy in three phases:
// 1. Plan a pipeline, and write the plan to Status.
// 2. Deploy the pipeline per plan.
// 3. Verify readiness of the entire pipeline.
// Each phase should be a separate reconcilation loop to avoid races.
// TODO: We should disown orphaned resources when the pipeline changes.
if (status.getSql() == null || !status.getSql().equals(object.getSpec().getSql())) {
if (diverged(object.getSpec(), status)) {
// Phase 1
log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql());

Expand Down Expand Up @@ -121,6 +126,7 @@ public Result reconcile(Request request) {
status.setResources(combined);

status.setSql(object.getSpec().getSql());
status.setHints(object.getSpec().getHints());
status.setReady(null); // null indicates that pipeline needs to be deployed
status.setFailed(null);
status.setMessage("Planned.");
Expand Down Expand Up @@ -175,7 +181,7 @@ public Result reconcile(Request request) {
return result;
}

Pipeline pipeline(V1alpha1Subscription object) throws Exception {
private Pipeline pipeline(V1alpha1Subscription object) throws Exception {
String name = object.getMetadata().getName();
String sql = object.getSpec().getSql();
String database = object.getSpec().getDatabase();
Expand Down Expand Up @@ -291,6 +297,12 @@ private static boolean isReady(DynamicKubernetesObject obj) {
return true;
}

// Whether status has diverged from spec (i.e. we need to re-plan the pipeline)
private static boolean diverged(V1alpha1SubscriptionSpec spec, V1alpha1SubscriptionStatus status) {
return status.getSql() == null || !status.getSql().equals(spec.getSql())
|| !status.getHints().equals(spec.getHints());
}

public static Controller controller(Operator operator, HoptimatorPlanner.Factory plannerFactory, Resource.Environment environment) {
Reconciler reconciler = new SubscriptionReconciler(operator, plannerFactory, environment);
return ControllerBuilder.defaultBuilder(operator.informerFactory())
Expand Down

0 comments on commit d9a5c17

Please sign in to comment.