From 89bd60db995fe64dace5a680ec5afd07efefebb8 Mon Sep 17 00:00:00 2001 From: Michael Francis Date: Thu, 28 Nov 2024 14:29:42 -0500 Subject: [PATCH] cluster-autoscaler: Upgrade to k8s 1.29 (#1151) This patch was essentially a rewrite of the original. Context being, upstream, the cluster autoscaler team unified the watch and lister interfaces using informers. So, instead of patching both of those methods, only the `List()` method on the `allPodLister` is modified. Assuming my understanding is correct it'll have the same behaviour but with much less code (as evidenced by the removed lines in the original patch) I'm not sure how to validate it works as expected other than allowing the e2e tests to run, so if there is anything I can do, let me know :) --- cluster-autoscaler/Dockerfile | 2 +- cluster-autoscaler/ca.branch | 2 +- cluster-autoscaler/ca.commit | 2 +- cluster-autoscaler/ca.patch | 155 +++++++++------------------------- 4 files changed, 41 insertions(+), 120 deletions(-) diff --git a/cluster-autoscaler/Dockerfile b/cluster-autoscaler/Dockerfile index 29fd874bc..0530a1ce8 100644 --- a/cluster-autoscaler/Dockerfile +++ b/cluster-autoscaler/Dockerfile @@ -1,6 +1,6 @@ # NOTE: This must match CA's builder/Dockerfile: # https://github.com/kubernetes/autoscaler/blob//builder/Dockerfile -FROM golang:1.20.12 AS builder +FROM golang:1.21.6 AS builder WORKDIR /workspace diff --git a/cluster-autoscaler/ca.branch b/cluster-autoscaler/ca.branch index 52404bdd7..0a98806c6 100644 --- a/cluster-autoscaler/ca.branch +++ b/cluster-autoscaler/ca.branch @@ -1 +1 @@ -cluster-autoscaler-release-1.28 +cluster-autoscaler-release-1.29 diff --git a/cluster-autoscaler/ca.commit b/cluster-autoscaler/ca.commit index 76f5aef45..f12c734f0 100644 --- a/cluster-autoscaler/ca.commit +++ b/cluster-autoscaler/ca.commit @@ -1 +1 @@ -10a229ac17ea8049248d1c3ce2923b94a4f9085c +d4bbc686ac02a77a6ad1362fe7bbda387e8f074a diff --git a/cluster-autoscaler/ca.patch b/cluster-autoscaler/ca.patch index 3d132f7c3..ffcd20b5e 100644 --- a/cluster-autoscaler/ca.patch +++ b/cluster-autoscaler/ca.patch @@ -1,45 +1,22 @@ diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go -index 198fdfb37..d534fc1ef 100644 +index b9be94b6e..5efb40df2 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go -@@ -17,14 +17,19 @@ limitations under the License. +@@ -17,10 +17,12 @@ limitations under the License. package kubernetes import ( + "encoding/json" "time" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/resource" -+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" -+ "k8s.io/apimachinery/pkg/runtime" -+ "k8s.io/apimachinery/pkg/watch" - client "k8s.io/client-go/kubernetes" - v1appslister "k8s.io/client-go/listers/apps/v1" - v1batchlister "k8s.io/client-go/listers/batch/v1" -@@ -169,6 +174,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc - selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" + - string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) - podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector) -+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour) - podLister := v1lister.NewPodLister(store) - go reflector.Run(stopchannel) -@@ -212,6 +218,7 @@ func NewScheduledAndUnschedulablePodLister(kubeClient client.Interface, stopchan - selector := fields.ParseSelectorOrDie("status.phase!=" + - string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed)) - podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector) -+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch) - store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour) - podLister := v1lister.NewPodLister(store) - go reflector.Run(stopchannel) -@@ -221,6 +228,105 @@ func NewScheduledAndUnschedulablePodLister(kubeClient client.Interface, stopchan - } + "k8s.io/client-go/informers" +@@ -46,6 +48,14 @@ type ListerRegistry interface { + StatefulSetLister() v1appslister.StatefulSetLister } +// copied from github.com/neondatabase/autoscaling, neonvm/apis/neonvm/v1/virtualmachine_types.go. @@ -50,97 +27,41 @@ index 198fdfb37..d534fc1ef 100644 + Memory resource.Quantity `json:"memory"` +} + -+func wrapListWatchWithNeonVMUsage(lw *cache.ListWatch) *cache.ListWatch { -+ updatePodRequestsFromNeonVMAnnotation := func(pod *apiv1.Pod) { -+ annotation, ok := pod.Annotations["vm.neon.tech/usage"] -+ if !ok { -+ return -+ } -+ -+ var usage virtualMachineUsage -+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil { -+ return -+ } -+ -+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{ -+ apiv1.ResourceCPU: usage.CPU, -+ apiv1.ResourceMemory: usage.Memory, -+ }) + type listerRegistryImpl struct { + allNodeLister NodeLister + readyNodeLister NodeLister +@@ -221,6 +231,22 @@ type AllPodLister struct { + podLister v1lister.PodLister + } + ++func updatePodRequestsFromNeonVMAnnotation(pod *apiv1.Pod) { ++ annotation, ok := pod.Annotations["vm.neon.tech/usage"] ++ if !ok { ++ return + } + -+ return &cache.ListWatch{ -+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { -+ obj, err := lw.List(options) -+ if err != nil { -+ return obj, err -+ } -+ -+ list := obj.(*apiv1.PodList) -+ for i := range list.Items { -+ updatePodRequestsFromNeonVMAnnotation(&list.Items[i]) -+ } -+ return obj, nil -+ }, -+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { -+ iface, err := lw.Watch(options) -+ if err != nil { -+ return iface, err -+ } -+ -+ // Wrap the channel to update the pods as they come through -+ wrappedEvents := make(chan watch.Event) -+ proxyIface := watch.NewProxyWatcher(wrappedEvents) -+ -+ go func() { -+ events := iface.ResultChan() -+ -+ for { -+ var ok bool -+ var ev watch.Event -+ -+ select { -+ case <-proxyIface.StopChan(): -+ return -+ case ev, ok = <-events: -+ if !ok { -+ close(wrappedEvents) -+ return -+ } -+ } -+ -+ // Quoting the docs on watch.Event.Object: -+ // -+ // > Object is: -+ // > * If Type is Added or Modified: the new state of the object -+ // > * If type is Deleted: the state of the object immediately before deletion. -+ // > * If Type is Bookmark: the object [ ... ] where only ResourceVersion field -+ // > is set. -+ // > * If Type is Error: *api.Status is recommended; other types may make sense -+ // > depending on context. -+ // -+ // So basically, we want to process the object only if ev.Type is Added, -+ // Modified, or Deleted. -+ if ev.Type == watch.Added || ev.Type == watch.Modified || ev.Type == watch.Deleted { -+ pod := ev.Object.(*apiv1.Pod) -+ updatePodRequestsFromNeonVMAnnotation(pod) -+ } -+ -+ // Pass along the maybe-updated event -+ select { -+ case <-proxyIface.StopChan(): -+ return -+ case wrappedEvents <- ev: -+ // continue on to next event -+ } -+ } -+ }() -+ -+ return proxyIface, nil -+ }, -+ DisableChunking: lw.DisableChunking, ++ var usage virtualMachineUsage ++ if err := json.Unmarshal([]byte(annotation), &usage); err != nil { ++ return + } ++ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{ ++ apiv1.ResourceCPU: usage.CPU, ++ apiv1.ResourceMemory: usage.Memory, ++ }) +} + - // NodeLister lists nodes. - type NodeLister interface { - List() ([]*apiv1.Node, error) + // List returns all scheduled pods. + func (lister *AllPodLister) List() ([]*apiv1.Pod, error) { + var pods []*apiv1.Pod +@@ -231,7 +257,10 @@ func (lister *AllPodLister) List() ([]*apiv1.Pod, error) { + } + for _, p := range allPods { + if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed { +- pods = append(pods, p) ++ // We need to make a copy of the pod to avoid modifying the original pod, since *p is a pointer to the object in the informer cache. ++ podCopy := p.DeepCopy() ++ updatePodRequestsFromNeonVMAnnotation(podCopy) ++ pods = append(pods, podCopy) + } + } + return pods, nil