Skip to content

Commit

Permalink
address review comments - part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabh-11 committed Dec 26, 2024
1 parent f3774f4 commit 9063248
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 85 deletions.
78 changes: 27 additions & 51 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"fmt"
"github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"slices"
"strconv"
"strings"
Expand All @@ -39,7 +38,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -364,47 +362,47 @@ func (machineDeployment *MachineDeployment) DecreaseTargetSize(delta int) error
func (machineDeployment *MachineDeployment) Refresh() error {
machineDeployment.scalingMutex.Lock()
defer machineDeployment.scalingMutex.Unlock()
mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name)
mcd, err := machineDeployment.mcmManager.GetMachineDeploymentResource(machineDeployment.Name)
if err != nil {
return fmt.Errorf("failed to get machine deployment %s: %v", machineDeployment.Name, err)
}
// ignore the machine deployment if it is in rolling update
if !isRollingUpdateFinished(mcd) {
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name)
return nil
return err
}
markedMachines := getMachinesMarkedByCAForDeletion(mcd)
markedMachineNames := getMachineNamesMarkedByCAForDeletion(mcd)
machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
return err
}
var incorrectlyMarkedMachines []*types.NamespacedName
for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {
continue
}
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) {
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace})
}
}
var updatedMarkedMachines []string
for machineName := range markedMachines {
// update the machines-marked-by-ca-for-deletion annotation with the machines that are still marked for deletion by CA.
// This is done to ensure that the machines that are no longer present are removed from the annotation.
var updatedMarkedMachineNames []string
for _, machineName := range markedMachineNames {
if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool {
return mc.Name == machineName
}) {
updatedMarkedMachines = append(updatedMarkedMachines, machineName)
updatedMarkedMachineNames = append(updatedMarkedMachineNames, machineName)
}
}
clone := mcd.DeepCopy()
clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",")
clone.Annotations[machinesMarkedByCAForDeletion] = createMachinesMarkedForDeletionAnnotationValue(updatedMarkedMachineNames)
ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout)
defer cancelFn()
_, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{})
if err != nil {
return err
}
// reset the priority for the machines that are not present in machines-marked-by-ca-for-deletion annotation
var incorrectlyMarkedMachines []types.NamespacedName
for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if isMachineFailedOrTerminating(machine) {
continue
}
// check if the machine is marked for deletion by CA but not present in machines-marked-by-ca-for-deletion annotation. This means that CA was not able to reduce the replicas
// corresponding to this machine and hence the machine should not be marked for deletion.
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForDeletionCandidateMachines && !slices.Contains(markedMachineNames, machine.Name) {
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace})
}
}
return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines)
}

Expand Down Expand Up @@ -554,32 +552,10 @@ func (machineDeployment *MachineDeployment) AtomicIncreaseSize(delta int) error
return cloudprovider.ErrNotImplemented
}

func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*MachineDeployment, error) {
spec, err := dynamic.SpecFromString(value, true)

if err != nil {
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
}
s := strings.Split(spec.Name, ".")
Namespace, Name := s[0], s[1]

machinedeployment := buildMachineDeployment(mcmManager, spec.MinSize, spec.MaxSize, Namespace, Name)
return machinedeployment, nil
}

func getMachinesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) sets.Set[string] {
return sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...)
}

func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment {
return &MachineDeployment{
mcmManager: mcmManager,
minSize: minSize,
maxSize: maxSize,
scalingMutex: sync.Mutex{},
NamespacedName: types.NamespacedName{
Name: name,
Namespace: namespace,
},
// getMachineNamesMarkedByCAForDeletion returns the set of machine names marked by CA for deletion.
func getMachineNamesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) []string {
if mcd.Annotations == nil || mcd.Annotations[machinesMarkedByCAForDeletion] == "" {
return make([]string, 0)
}
return strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")
}
104 changes: 70 additions & 34 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"flag"
"fmt"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
v1appslister "k8s.io/client-go/listers/apps/v1"
"k8s.io/utils/pointer"
"maps"
Expand All @@ -37,6 +38,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

awsapis "github.com/gardener/machine-controller-manager-provider-aws/pkg/aws/apis"
Expand Down Expand Up @@ -77,9 +79,9 @@ const (
defaultResetAnnotationTimeout = 10 * time.Second
// defaultPriorityValue is the default value for the priority annotation used by CA. It is set to 3 because MCM defaults the priority of machine it creates to 3.
defaultPriorityValue = "3"
// priorityValueForCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1.
priorityValueForCandidateMachines = "1"
minResyncPeriodDefault = 1 * time.Hour
// priorityValueForDeletionCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1.
priorityValueForDeletionCandidateMachines = "1"
minResyncPeriodDefault = 1 * time.Hour
// machinePriorityAnnotation is the annotation to set machine priority while deletion
machinePriorityAnnotation = "machinepriority.machine.sapcloud.io"
// kindMachineClass is the kind for generic machine class used by the OOT providers
Expand Down Expand Up @@ -413,11 +415,11 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *types.NamespacedNam
// Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired.
// It will select the machines to reset the priority based on the descending order of creation timestamp.
func (m *McmManager) Refresh() error {
var collectiveError error
var collectiveError []error
for _, machineDeployment := range m.machineDeployments {
collectiveError = errors.Join(collectiveError, machineDeployment.Refresh())
collectiveError = append(collectiveError, machineDeployment.Refresh())
}
return collectiveError
return errors.Join(collectiveError...)
}

// Cleanup does nothing at the moment.
Expand All @@ -428,18 +430,17 @@ func (m *McmManager) Cleanup() {

// GetMachineDeploymentSize returns the replicas field of the MachineDeployment
func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployment) (int64, error) {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name)
md, err := m.GetMachineDeploymentResource(machinedeployment.Name)
if err != nil {
return 0, fmt.Errorf("Unable to fetch MachineDeployment object %s %v", machinedeployment.Name, err)
return 0, err
}
return int64(md.Spec.Replicas), nil
}

// SetMachineDeploymentSize sets the desired size for the Machinedeployment.
func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeployment *MachineDeployment, size int64) (bool, error) {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name)
md, err := m.GetMachineDeploymentResource(machinedeployment.Name)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", machinedeployment.Name, err)
return true, err
}
// don't scale down during rolling update, as that could remove ready node with workload
Expand All @@ -466,24 +467,23 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) e
commonMachineDeployment.scalingMutex.Lock()
defer commonMachineDeployment.scalingMutex.Unlock()
// get the machine deployment and return if rolling update is not finished
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name)
md, err := m.GetMachineDeploymentResource(commonMachineDeployment.Name)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", commonMachineDeployment.Name, err)
return err
}
if !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}
markedMachines := getMachinesMarkedByCAForDeletion(md)
machineNamesMarkedByCA := getMachineNamesMarkedByCAForDeletion(md)
// update priorities of machines to be deleted except the ones already in termination to 1
machinesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs)
machineNamesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs)
if err != nil {
return err
}
markedMachines.Insert(machinesWithPrio1...)
machineNamesMarkedByCA = append(machineNamesMarkedByCA, machineNamesWithPrio1...)
// Trying to update the machineDeployment till the deadline
err = m.retry(func(ctx context.Context) (bool, error) {
return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machinesWithPrio1), strings.Join(markedMachines.UnsortedList(), ","))
return m.scaleDownAndAnnotateMachineDeployment(ctx, commonMachineDeployment.Name, len(machineNamesWithPrio1), createMachinesMarkedForDeletionAnnotationValue(machineNamesMarkedByCA))
}, "MachineDeployment", "update", commonMachineDeployment.Name)
if err != nil {
klog.Errorf("unable to scale in machine deployment %s, Error: %v", commonMachineDeployment.Name, err)
Expand All @@ -492,16 +492,16 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) e
}

// resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue
func (m *McmManager) resetPriorityForMachines(mcRefs []*types.NamespacedName) error {
var collectiveError error
func (m *McmManager) resetPriorityForMachines(mcRefs []types.NamespacedName) error {
var collectiveError []error
for _, mcRef := range mcRefs {
machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name)
if kube_errors.IsNotFound(err) {
klog.Warningf("Machine %s not found, skipping resetting priority annotation", mcRef.Name)
continue
}
if err != nil {
collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err))
collectiveError = append(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err))
continue
}
ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(defaultResetAnnotationTimeout))
Expand All @@ -515,17 +515,18 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*types.NamespacedName) er
return nil
}()
if err != nil {
collectiveError = errors.Join(collectiveError, fmt.Errorf("could not reset priority annotation on machine %s, Error: %v", machine.Name, err))
collectiveError = append(collectiveError, fmt.Errorf("could not reset priority annotation on machine %s, Error: %v", machine.Name, err))
continue
}
}
return collectiveError
return errors.Join(collectiveError...)
}

// prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*types.NamespacedName) ([]string, error) {
var expectedToTerminateMachineNodePairs = make(map[string]string)
var machinesMarkedWithPrio1 []string
var prio1MarkedMachineNames []string

for _, machineRef := range targetMachineRefs {
// Trying to update the priority of machineRef till m.maxRetryTimeout
if err := m.retry(func(ctx context.Context) (bool, error) {
Expand All @@ -541,20 +542,20 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*types.Na
if isMachineFailedOrTerminating(mc) {
return false, nil
}
if mc.Annotations[machinePriorityAnnotation] == priorityValueForCandidateMachines {
if mc.Annotations[machinePriorityAnnotation] == priorityValueForDeletionCandidateMachines {
klog.Infof("Machine %q priority is already set to 1, hence skipping the update", mc.Name)
return false, nil
}
machinesMarkedWithPrio1 = append(machinesMarkedWithPrio1, machineRef.Name)
prio1MarkedMachineNames = append(prio1MarkedMachineNames, machineRef.Name)
expectedToTerminateMachineNodePairs[mc.Name] = mc.Labels["node"]
return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines)
return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForDeletionCandidateMachines)
}, "Machine", "update", machineRef.Name); err != nil {
klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err)
return nil, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err)
}
}
klog.V(2).Infof("Expected to remove following {machineRef: corresponding node} pairs %s", expectedToTerminateMachineNodePairs)
return machinesMarkedWithPrio1, nil
return prio1MarkedMachineNames, nil
}

// updateAnnotationOnMachine returns error only when updating the annotations on machine has been failing consequently and deadline is crossed
Expand Down Expand Up @@ -583,9 +584,8 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin
// scaleDownAndAnnotateMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down.
// It also updates the machines-marked-by-ca-for-deletion annotation on the machine deployment with the list of existing machines marked for deletion.
func (m *McmManager) scaleDownAndAnnotateMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
md, err := m.GetMachineDeploymentResource(mdName)
if err != nil {
klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", mdName, err)
return true, err
}
mdclone := md.DeepCopy()
Expand Down Expand Up @@ -732,21 +732,19 @@ func validateNodeTemplate(nodeTemplateAttributes *v1alpha1.NodeTemplate) error {

// GetMachineDeploymentAnnotations returns the annotations present on the machine deployment for the provided machine deployment name
func (m *McmManager) GetMachineDeploymentAnnotations(machineDeploymentName string) (map[string]string, error) {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machineDeploymentName)
md, err := m.GetMachineDeploymentResource(machineDeploymentName)
if err != nil {
return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", machineDeploymentName, err)
return nil, err
}

return md.Annotations, nil
}

// GetMachineDeploymentNodeTemplate returns the NodeTemplate of a node belonging to the same worker pool as the machinedeployment
// If no node present then it forms the nodeTemplate using the one present in machineClass
func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *MachineDeployment) (*nodeTemplate, error) {

md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name)
md, err := m.GetMachineDeploymentResource(machinedeployment.Name)
if err != nil {
return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", machinedeployment.Name, err)
return nil, err
}

var (
Expand Down Expand Up @@ -892,6 +890,16 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine
return nodeTmpl, nil
}

// GetMachineDeploymentResource returns the MachineDeployment object for the provided machine deployment name
func (m *McmManager) GetMachineDeploymentResource(mdName string) (*v1alpha1.MachineDeployment, error) {
md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName)
if err != nil {
klog.Errorf("unable to fetch MachineDeployment object %s, Error: %v", mdName, err)
return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", mdName, err)
}
return md, nil
}

func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool {
for _, cond := range md.Status.Conditions {
switch {
Expand Down Expand Up @@ -1034,6 +1042,30 @@ func buildGenericLabels(template *nodeTemplate, nodeName string) map[string]stri
return result
}

func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*MachineDeployment, error) {
spec, err := dynamic.SpecFromString(value, true)
if err != nil {
return nil, fmt.Errorf("failed to parse node group spec: %v", err)
}
s := strings.Split(spec.Name, ".")
Namespace, Name := s[0], s[1]
machinedeployment := buildMachineDeployment(mcmManager, spec.MinSize, spec.MaxSize, Namespace, Name)
return machinedeployment, nil
}

func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment {
return &MachineDeployment{
mcmManager: mcmManager,
minSize: minSize,
maxSize: maxSize,
scalingMutex: sync.Mutex{},
NamespacedName: types.NamespacedName{
Name: name,
Namespace: namespace,
},
}
}

// isMachineFailedOrTerminating returns true if machine is already being terminated or considered for termination by autoscaler.
func isMachineFailedOrTerminating(machine *v1alpha1.Machine) bool {
if !machine.GetDeletionTimestamp().IsZero() || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {
Expand All @@ -1051,3 +1083,7 @@ func filterExtendedResources(allResources v1.ResourceList) (extendedResources v1
})
return
}

func createMachinesMarkedForDeletionAnnotationValue(machineNames []string) string {
return strings.Join(machineNames, ",")
}

0 comments on commit 9063248

Please sign in to comment.