diff --git a/cmd/main.go b/cmd/main.go index 1dce786..f2be773 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,6 +18,7 @@ package main import ( "os" + "time" flag "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime" @@ -65,6 +66,7 @@ func main() { var ( metricsAddr string eventsAddr string + requeueDependency time.Duration enableLeaderElection bool httpRetry int defaultServiceAccount string @@ -80,6 +82,7 @@ func main() { flag.StringVar(&metricsAddr, "metrics-addr", ":8083", "The address the metric endpoint binds to.") flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.") + flag.DurationVar(&requeueDependency, "requeue-dependency", 30*time.Second, "The interval at which failing dependencies are reevaluated.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") @@ -134,7 +137,8 @@ func main() { StatusPoller: polling.NewStatusPoller(mgr.GetClient(), mgr.GetRESTMapper(), pollingOpts), DisallowedFieldManagers: disallowedFieldManagers, }).SetupWithManager(mgr, controller.KCLRunReconcilerOptions{ - HTTPRetry: httpRetry, + DependencyRequeueInterval: requeueDependency, + HTTPRetry: httpRetry, }); err != nil { setupLog.Error(err, "unable to create controller", "controller", "KCLRun") os.Exit(1) diff --git a/internal/controller/kclrun_controller.go b/internal/controller/kclrun_controller.go index ff2e3cc..bf9acc4 100644 --- a/internal/controller/kclrun_controller.go +++ b/internal/controller/kclrun_controller.go @@ -88,12 +88,14 @@ type KCLRunReconciler struct { DefaultServiceAccount string DisallowedFieldManagers []string artifactFetcher *fetch.ArchiveFetcher + requeueDependency time.Duration statusManager string } type KCLRunReconcilerOptions struct { - HTTPRetry int + DependencyRequeueInterval time.Duration + HTTPRetry int } // SetupWithManager sets up the controller with the Manager. @@ -105,6 +107,7 @@ func (r *KCLRunReconciler) SetupWithManager(mgr ctrl.Manager, opts KCLRunReconci fetch.WithUntar(tar.WithMaxUntarSize(tar.UnlimitedUntarSize)), fetch.WithHostnameOverwrite(os.Getenv("SOURCE_CONTROLLER_LOCALHOST")), ) + r.requeueDependency = opts.DependencyRequeueInterval r.statusManager = "gotk-flux-kcl-controller" // New controller return ctrl.NewControllerManagedBy(mgr). @@ -214,6 +217,27 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, err } artifact := source.GetArtifact() + + // Requeue the reconciliation if the source artifact is not found. + if artifact == nil { + msg := fmt.Sprintf("Source artifact not found, retrying in %s", r.requeueDependency.String()) + conditions.MarkFalse(&obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", msg) + log.Info(msg) + return ctrl.Result{RequeueAfter: r.requeueDependency}, nil + } + + // Check dependencies and requeue the reconciliation if the check fails. + if len(obj.Spec.DependsOn) > 0 { + if err := r.checkDependencies(ctx, &obj, source); err != nil { + conditions.MarkFalse(&obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err) + msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String()) + log.Info(msg) + r.event(&obj, artifact.Revision, eventv1.EventSeverityInfo, msg, nil) + return ctrl.Result{RequeueAfter: r.requeueDependency}, nil + } + log.Info("All dependencies are ready, proceeding with reconciliation") + } + progressingMsg := fmt.Sprintf("new revision detected %s", artifact.Revision) log.Info(progressingMsg) conditions.MarkUnknown(&obj, meta.ReadyCondition, meta.ProgressingReason, "Reconciliation in progress") @@ -368,6 +392,51 @@ func (r *KCLRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, nil } +func (r *KCLRunReconciler) checkDependencies(ctx context.Context, + obj *v1alpha1.KCLRun, + source sourcev1.Source) error { + for _, d := range obj.Spec.DependsOn { + if d.Namespace == "" { + d.Namespace = obj.GetNamespace() + } + dName := types.NamespacedName{ + Namespace: d.Namespace, + Name: d.Name, + } + var k v1alpha1.KCLRun + err := r.Get(ctx, dName, &k) + if err != nil { + return fmt.Errorf("dependency '%s' not found: %w", dName, err) + } + + if len(k.Status.Conditions) == 0 || k.Generation != k.Status.ObservedGeneration { + return fmt.Errorf("dependency '%s' is not ready", dName) + } + + if !apimeta.IsStatusConditionTrue(k.Status.Conditions, meta.ReadyCondition) { + return fmt.Errorf("dependency '%s' is not ready", dName) + } + + srcNamespace := k.Spec.SourceRef.Namespace + if srcNamespace == "" { + srcNamespace = k.GetNamespace() + } + dSrcNamespace := obj.Spec.SourceRef.Namespace + if dSrcNamespace == "" { + dSrcNamespace = obj.GetNamespace() + } + + if k.Spec.SourceRef.Name == obj.Spec.SourceRef.Name && + srcNamespace == dSrcNamespace && + k.Spec.SourceRef.Kind == obj.Spec.SourceRef.Kind && + !source.GetArtifact().HasRevision(k.Status.LastAppliedRevision) { + return fmt.Errorf("dependency '%s' revision is not up to date", dName) + } + } + + return nil +} + func (r *KCLRunReconciler) getSource(ctx context.Context, obj *v1alpha1.KCLRun) (sourcev1.Source, error) { var src sourcev1.Source