Skip to content

Commit

Permalink
Updated finalizer and fixed looping issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Bobbins228 committed Jul 24, 2023
1 parent 79e81fb commit 8d6d112
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 47 deletions.
76 changes: 42 additions & 34 deletions controllers/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -42,12 +43,11 @@ type AppWrapperReconciler struct {
}

var (
scaledAppwrapper []string
reuse = true
ocmClusterID string
ocmToken string
useMachineSets bool

scaledAppwrapper []string
reuse = true
ocmClusterID string
ocmToken string
useMachineSets bool
maxScaleNodesAllowed int
)

Expand Down Expand Up @@ -94,41 +94,49 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
getOCMClusterID(r)
}

//onAdd replacement
if appwrapper.Status.State == arbv1.AppWrapperStateEnqueued || appwrapper.Status.State == "" {
//scaledAppwrapper = append(scaledAppwrapper, aw.Name)
demandPerInstanceType := discoverInstanceTypes(&appwrapper)
//TODO: simplify the looping
if useMachineSets {
if r.canScaleMachineset(ctx, demandPerInstanceType) {
r.scaleUp(ctx, &appwrapper, demandPerInstanceType)
} else {
klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed)
// Checks for finalizer on appwrapper where its deletion timestamp == 0
if appwrapper.ObjectMeta.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(&appwrapper, finalizerName) {
//onAdd replacement
if appwrapper.Status.State == arbv1.AppWrapperStateEnqueued || appwrapper.Status.State == "" {
demandPerInstanceType := discoverInstanceTypes(&appwrapper)
//TODO: simplify the looping
if useMachineSets {
if r.canScaleMachineset(ctx, demandPerInstanceType) {
r.scaleUp(ctx, &appwrapper, demandPerInstanceType)
} else {
klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed)
}
} else {
if canScaleMachinepool(demandPerInstanceType) {
if err := r.scaleUp(ctx, &appwrapper, demandPerInstanceType); err != nil {
return ctrl.Result{}, err
}
} else {
klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed)
}
}
return ctrl.Result{}, nil
}
} else {
if canScaleMachinepool(demandPerInstanceType) {
r.scaleUp(ctx, &appwrapper, demandPerInstanceType)
} else {
klog.Infof("Cannot scale up replicas max replicas allowed is %v", maxScaleNodesAllowed)
// Adds finalizer to the appwrapper
controllerutil.AddFinalizer(&appwrapper, finalizerName)
if err := r.Update(ctx, &appwrapper); err != nil {
return ctrl.Result{}, err
}
}

}

// Checks for finalizer on appwrapper where its deletion timestamp != 0
if !appwrapper.ObjectMeta.DeletionTimestamp.IsZero() {
if contains(appwrapper.ObjectMeta.Finalizers, finalizerName) {
} else {
// if the deletion timestamp != 0 then the machines are scaled down and the finalizer is removed
if controllerutil.ContainsFinalizer(&appwrapper, finalizerName) {
if err := r.finalizeScalingDownMachines(ctx, &appwrapper); err != nil {
return ctrl.Result{}, err
}

// Remove the finalizer from the AppWrapper's metadata
appwrapper.ObjectMeta.Finalizers = removeString(appwrapper.ObjectMeta.Finalizers, finalizerName)
controllerutil.RemoveFinalizer(&appwrapper, finalizerName)
if err := r.Update(ctx, &appwrapper); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil

}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -226,7 +234,7 @@ func canScaleMachinepool(demandPerInstanceType map[string]int) bool {
return true
}

func (r *AppWrapperReconciler) scaleUp(ctx context.Context, aw *arbv1.AppWrapper, demandMapPerInstanceType map[string]int) {
func (r *AppWrapperReconciler) scaleUp(ctx context.Context, aw *arbv1.AppWrapper, demandMapPerInstanceType map[string]int) error {
//Assumption is made that the cluster has machineset configure that AW needs
for userRequestedInstanceType := range demandMapPerInstanceType {
//TODO: get unique machineset
Expand All @@ -235,12 +243,12 @@ func (r *AppWrapperReconciler) scaleUp(ctx context.Context, aw *arbv1.AppWrapper
if useMachineSets {
r.scaleMachineSet(ctx, aw, userRequestedInstanceType, replicas)
} else {
scaleMachinePool(aw, userRequestedInstanceType, replicas)
r.scaleMachinePool(ctx, aw, userRequestedInstanceType, replicas)
}
}
klog.Infof("Completed Scaling for %v", aw.Name)
scaledAppwrapper = append(scaledAppwrapper, aw.Name)

return nil
}

func (r *AppWrapperReconciler) IsAwPending(ctx context.Context) (false bool, aw *arbv1.AppWrapper) {
Expand Down
6 changes: 4 additions & 2 deletions controllers/machinepools.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"k8s.io/klog"
)

func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, replicas int) {
func (r *AppWrapperReconciler) scaleMachinePool(ctx context.Context, aw *arbv1.AppWrapper, userRequestedInstanceType string, replicas int) error {
logger, err := ocmsdk.NewGoLoggerBuilder().
Debug(false).
Build()
Expand Down Expand Up @@ -46,11 +46,12 @@ func scaleMachinePool(aw *arbv1.AppWrapper, userRequestedInstanceType string, re
klog.Errorf(`Error building MachinePool: %v`, err)
}
klog.Infof("Built MachinePool with instance type %v and name %v", userRequestedInstanceType, createMachinePool.ID())
response, err := clusterMachinePools.Add().Body(createMachinePool).SendContext(context.Background())
response, err := clusterMachinePools.Add().Body(createMachinePool).SendContext(ctx)
if err != nil {
klog.Errorf(`Error creating MachinePool: %v`, err)
}
klog.Infof("Created MachinePool: %v", response)
return nil
}

func (r *AppWrapperReconciler) deleteMachinePool(ctx context.Context, aw *arbv1.AppWrapper) {
Expand Down Expand Up @@ -82,6 +83,7 @@ func (r *AppWrapperReconciler) deleteMachinePool(ctx context.Context, aw *arbv1.
if err != nil {
klog.Infof("Error deleting target machinepool %v", targetMachinePool)
}
klog.Infof("Successfully Scaled down target machinepool %v", id)
}
return true
})
Expand Down
11 changes: 0 additions & 11 deletions controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,3 @@ func contains(s []string, str string) bool {

return false
}

// removeString removes a string from a string slice
func removeString(slice []string, str string) []string {
result := make([]string, 0, len(slice))
for _, s := range slice {
if s != str {
result = append(result, s)
}
}
return result
}

0 comments on commit 8d6d112

Please sign in to comment.