Skip to content

Commit

Permalink
feat: add dependsOn field for flux kcl controller
Browse files Browse the repository at this point in the history
Signed-off-by: peefy <[email protected]>
  • Loading branch information
Peefy committed Aug 20, 2024
1 parent 769591f commit 4cb4ff6
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
6 changes: 5 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"os"
"time"

flag "github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -65,6 +66,7 @@ func main() {
var (
metricsAddr string
eventsAddr string
requeueDependency time.Duration
enableLeaderElection bool
httpRetry int
defaultServiceAccount string
Expand All @@ -80,6 +82,7 @@ func main() {

flag.StringVar(&metricsAddr, "metrics-addr", ":8083", "The address the metric endpoint binds to.")
flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.")
flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
Expand Down Expand Up @@ -134,7 +137,8 @@ func main() {
StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), pollingOpts),
DisallowedFieldManagers: disallowedFieldManagers,
}).SetupWithManager(mgr, controller.KCLRunReconcilerOptions{
HTTPRetry: httpRetry,
DependencyRequeueInterval: requeueDependency,
HTTPRetry: httpRetry,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KCLRun")
os.Exit(1)
Expand Down
71 changes: 70 additions & 1 deletion internal/controller/kclrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ type KCLRunReconciler struct {
DefaultServiceAccount string
DisallowedFieldManagers []string
artifactFetcher *fetch.ArchiveFetcher
requeueDependency time.Duration

statusManager string
}

type KCLRunReconcilerOptions struct {
HTTPRetry int
DependencyRequeueInterval time.Duration
HTTPRetry int
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -105,6 +107,7 @@ func (r *KCLRunReconciler) SetupWithManager(mgr ctrl.Manager, opts KCLRunReconci
fetch.WithUntar(tar.WithMaxUntarSize(tar.UnlimitedUntarSize)),
fetch.WithHostnameOverwrite(os.Getenv("SOURCE_CONTROLLER_LOCALHOST")),
)
r.requeueDependency = opts.DependencyRequeueInterval
r.statusManager = "gotk-flux-kcl-controller"
// New controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -214,6 +217,27 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return ctrl.Result{}, err
}
artifact := source.GetArtifact()

// Requeue the reconciliation if the source artifact is not found.
if artifact == nil {
msg := fmt.Sprintf("Source artifact not found, retrying in %s", r.requeueDependency.String())
conditions.MarkFalse(&obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", msg)
log.Info(msg)
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
}

// Check dependencies and requeue the reconciliation if the check fails.
if len(obj.Spec.DependsOn) > 0 {
if err := r.checkDependencies(ctx, &obj, source); err != nil {
conditions.MarkFalse(&obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err)
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
log.Info(msg)
r.event(&obj, artifact.Revision, eventv1.EventSeverityInfo, msg, nil)
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
}
log.Info("All dependencies are ready, proceeding with reconciliation")
}

progressingMsg := fmt.Sprintf("new revision detected %s", artifact.Revision)
log.Info(progressingMsg)
conditions.MarkUnknown(&obj, meta.ReadyCondition, meta.ProgressingReason, "Reconciliation in progress")
Expand Down Expand Up @@ -368,6 +392,51 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return ctrl.Result{}, nil
}

func (r *KCLRunReconciler) checkDependencies(ctx context.Context,
obj *v1alpha1.KCLRun,
source sourcev1.Source) error {
for _, d := range obj.Spec.DependsOn {
if d.Namespace == "" {
d.Namespace = obj.GetNamespace()
}
dName := types.NamespacedName{
Namespace: d.Namespace,
Name: d.Name,
}
var k v1alpha1.KCLRun
err := r.Get(ctx, dName, &k)
if err != nil {
return fmt.Errorf("dependency '%s' not found: %w", dName, err)
}

if len(k.Status.Conditions) == 0 || k.Generation != k.Status.ObservedGeneration {
return fmt.Errorf("dependency '%s' is not ready", dName)
}

if !apimeta.IsStatusConditionTrue(k.Status.Conditions, meta.ReadyCondition) {
return fmt.Errorf("dependency '%s' is not ready", dName)
}

srcNamespace := k.Spec.SourceRef.Namespace
if srcNamespace == "" {
srcNamespace = k.GetNamespace()
}
dSrcNamespace := obj.Spec.SourceRef.Namespace
if dSrcNamespace == "" {
dSrcNamespace = obj.GetNamespace()
}

if k.Spec.SourceRef.Name == obj.Spec.SourceRef.Name &&
srcNamespace == dSrcNamespace &&
k.Spec.SourceRef.Kind == obj.Spec.SourceRef.Kind &&
!source.GetArtifact().HasRevision(k.Status.LastAppliedRevision) {
return fmt.Errorf("dependency '%s' revision is not up to date", dName)
}
}

return nil
}

func (r *KCLRunReconciler) getSource(ctx context.Context,
obj *v1alpha1.KCLRun) (sourcev1.Source, error) {
var src sourcev1.Source
Expand Down

0 comments on commit 4cb4ff6

Please sign in to comment.