Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit d470abf
Author: Michael Francis <[email protected]>
Date:   Fri Nov 22 11:24:13 2024 -0500

    added missing comment

commit 4bbc78a
Merge: 726f4f1 de028a6
Author: Michael Francis <[email protected]>
Date:   Wed Nov 20 11:17:12 2024 -0500

    Merge branch 'main' into chore/upgrade-dev-clusters-1.29-take-two

commit 726f4f1
Author: Michael Francis <[email protected]>
Date:   Tue Nov 19 17:07:48 2024 -0500

    Upgrade to cluster-autoscaler 1.29 - take two
  • Loading branch information
edude03 committed Nov 22, 2024
1 parent 71d71c3 commit 41c2157
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 118 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# NOTE: This must match CA's builder/Dockerfile:
# https://github.com/kubernetes/autoscaler/blob/<GIT_TAG>/builder/Dockerfile
FROM golang:1.20.12 AS builder
FROM golang:1.21.6 AS builder

WORKDIR /workspace

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/ca.branch
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cluster-autoscaler-release-1.28
cluster-autoscaler-release-1.29
2 changes: 1 addition & 1 deletion cluster-autoscaler/ca.commit
Original file line number Diff line number Diff line change
@@ -1 +1 @@
10a229ac17ea8049248d1c3ce2923b94a4f9085c
d4bbc686ac02a77a6ad1362fe7bbda387e8f074a
161 changes: 46 additions & 115 deletions cluster-autoscaler/ca.patch
Original file line number Diff line number Diff line change
@@ -1,37 +1,22 @@
diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go
index d0033550f..fa3c2ec30 100644
index b9be94b6e..df9dc08a9 100644
--- a/cluster-autoscaler/utils/kubernetes/listers.go
+++ b/cluster-autoscaler/utils/kubernetes/listers.go
@@ -17,14 +17,19 @@ limitations under the License.
@@ -17,10 +17,12 @@ limitations under the License.
package kubernetes

import (
+ "encoding/json"
"time"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
+ "k8s.io/apimachinery/pkg/api/resource"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
client "k8s.io/client-go/kubernetes"
v1appslister "k8s.io/client-go/listers/apps/v1"
v1batchlister "k8s.io/client-go/listers/batch/v1"
@@ -185,6 +190,7 @@ func NewUnschedulablePodInNamespaceLister(kubeClient client.Interface, namespace
selector := fields.ParseSelectorOrDie("spec.nodeName==" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
go reflector.Run(stopchannel)
@@ -209,6 +215,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
selector := fields.ParseSelectorOrDie("spec.nodeName!=" + "" + ",status.phase!=" +
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
+ podListWatch = wrapListWatchWithNeonVMUsage(podListWatch)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
podLister := v1lister.NewPodLister(store)
go reflector.Run(stopchannel)
@@ -218,6 +225,105 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
}
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
@@ -46,6 +48,14 @@ type ListerRegistry interface {
StatefulSetLister() v1appslister.StatefulSetLister
}

+// copied from github.com/neondatabase/autoscaling, neonvm/apis/neonvm/v1/virtualmachine_types.go.
Expand All @@ -42,97 +27,43 @@ index d0033550f..fa3c2ec30 100644
+ Memory resource.Quantity `json:"memory"`
+}
+
+func wrapListWatchWithNeonVMUsage(lw *cache.ListWatch) *cache.ListWatch {
+ updatePodRequestsFromNeonVMAnnotation := func(pod *apiv1.Pod) {
+ annotation, ok := pod.Annotations["vm.neon.tech/usage"]
+ if !ok {
+ return
+ }
+
+ var usage virtualMachineUsage
+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
+ return
+ }
+
+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
+ apiv1.ResourceCPU: usage.CPU,
+ apiv1.ResourceMemory: usage.Memory,
+ })
type listerRegistryImpl struct {
allNodeLister NodeLister
readyNodeLister NodeLister
@@ -221,6 +231,22 @@ type AllPodLister struct {
podLister v1lister.PodLister
}

+func updatePodRequestsFromNeonVMAnnotation(pod *apiv1.Pod) {
+ annotation, ok := pod.Annotations["vm.neon.tech/usage"]
+ if !ok {
+ return
+ }
+
+ return &cache.ListWatch{
+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
+ obj, err := lw.List(options)
+ if err != nil {
+ return obj, err
+ }
+
+ list := obj.(*apiv1.PodList)
+ for i := range list.Items {
+ updatePodRequestsFromNeonVMAnnotation(&list.Items[i])
+ }
+ return obj, nil
+ },
+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
+ iface, err := lw.Watch(options)
+ if err != nil {
+ return iface, err
+ }
+
+ // Wrap the channel to update the pods as they come through
+ wrappedEvents := make(chan watch.Event)
+ proxyIface := watch.NewProxyWatcher(wrappedEvents)
+
+ go func() {
+ events := iface.ResultChan()
+
+ for {
+ var ok bool
+ var ev watch.Event
+
+ select {
+ case <-proxyIface.StopChan():
+ return
+ case ev, ok = <-events:
+ if !ok {
+ close(wrappedEvents)
+ return
+ }
+ }
+
+ // Quoting the docs on watch.Event.Object:
+ //
+ // > Object is:
+ // > * If Type is Added or Modified: the new state of the object
+ // > * If type is Deleted: the state of the object immediately before deletion.
+ // > * If Type is Bookmark: the object [ ... ] where only ResourceVersion field
+ // > is set.
+ // > * If Type is Error: *api.Status is recommended; other types may make sense
+ // > depending on context.
+ //
+ // So basically, we want to process the object only if ev.Type is Added,
+ // Modified, or Deleted.
+ if ev.Type == watch.Added || ev.Type == watch.Modified || ev.Type == watch.Deleted {
+ pod := ev.Object.(*apiv1.Pod)
+ updatePodRequestsFromNeonVMAnnotation(pod)
+ }
+
+ // Pass along the maybe-updated event
+ select {
+ case <-proxyIface.StopChan():
+ return
+ case wrappedEvents <- ev:
+ // continue on to next event
+ }
+ }
+ }()
+
+ return proxyIface, nil
+ },
+ DisableChunking: lw.DisableChunking,
+ var usage virtualMachineUsage
+ if err := json.Unmarshal([]byte(annotation), &usage); err != nil {
+ return
+ }
+ pod.Spec.Containers[0].Resources.Requests = apiv1.ResourceList(map[apiv1.ResourceName]resource.Quantity{
+ apiv1.ResourceCPU: usage.CPU,
+ apiv1.ResourceMemory: usage.Memory,
+ })
+}
+
// NodeLister lists nodes.
type NodeLister interface {
List() ([]*apiv1.Node, error)
// List returns all scheduled pods.
func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
var pods []*apiv1.Pod
@@ -229,9 +255,12 @@ func (lister *AllPodLister) List() ([]*apiv1.Pod, error) {
if err != nil {
return pods, err
}
+
for _, p := range allPods {
if p.Status.Phase != apiv1.PodSucceeded && p.Status.Phase != apiv1.PodFailed {
- pods = append(pods, p)
+ podCopy := p.DeepCopy()
+ updatePodRequestsFromNeonVMAnnotation(podCopy)
+ pods = append(pods, podCopy)
}
}
return pods, nil

0 comments on commit 41c2157

Please sign in to comment.