Skip to content

Commit

Permalink
Merge pull request #16 from ctrox/container-status-labels
Browse files Browse the repository at this point in the history
feat: add container status labels
  • Loading branch information
ctrox authored Jun 24, 2024
2 parents 3176b7d + 5d05be0 commit a6e2804
Show file tree
Hide file tree
Showing 15 changed files with 382 additions and 97 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,34 @@ node driver to patch pods. To deploy this, simply uncomment the
This will add the flag and the required permissions when building the
kustomization.

#### Status Labels

To reflect the container scaling status in the k8s API, the manager can set
status labels on a pod. This requires the flag `-status-labels=true`, which is
set by default in the production deployment.

The resulting labels have the following structure:

```yaml
status.zeropod.ctrox.dev/<container name>: <container status>
```

So if our pod has two containers, one of them running and one in scaled-down
state, the labels would be set like this:

```yaml
labels:
status.zeropod.ctrox.dev/container1: RUNNING
status.zeropod.ctrox.dev/container2: SCALED_DOWN
```

#### Flags

```
-metrics-addr=":8080" sets the address of the metrics server
-debug enables debug logging
-in-place-scaling=false enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag
-status-labels=false update pod labels to reflect container status
```
## Metrics
Expand Down
15 changes: 7 additions & 8 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
debug = flag.Bool("debug", false, "enable debug logs")
inPlaceScaling = flag.Bool("in-place-scaling", false,
"enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag")
statusLabels = flag.Bool("status-labels", false, "update pod labels to reflect container status")
)

func main() {
Expand All @@ -43,17 +44,15 @@ func main() {
os.Exit(1)
}

subscribers := []manager.StatusHandler{}
podHandlers := []manager.PodHandler{}
if *statusLabels {
podHandlers = append(podHandlers, manager.NewPodLabeller())
}
if *inPlaceScaling {
podScaler, err := manager.NewPodScaler()
if err != nil {
slog.Error("podScaler init", "err", err)
os.Exit(1)
}
subscribers = append(subscribers, podScaler)
podHandlers = append(podHandlers, manager.NewPodScaler())
}

if err := manager.StartSubscribers(ctx, subscribers...); err != nil {
if err := manager.StartSubscribers(ctx, podHandlers...); err != nil {
slog.Error("starting subscribers", "err", err)
os.Exit(1)
}
Expand Down
2 changes: 0 additions & 2 deletions config/in-place-scaling/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
apiVersion: kustomize.config.k8s.io/v1alpha1
kind: Component
resources:
- rbac.yaml
patches:
- patch: |-
- op: add
Expand Down
2 changes: 2 additions & 0 deletions config/kind/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ resources:
- ../base
components:
- ../in-place-scaling
- ../pod-updater
- ../status-labels
images:
- name: manager
newName: ghcr.io/ctrox/zeropod-manager
Expand Down
4 changes: 4 additions & 0 deletions config/pod-updater/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: kustomize.config.k8s.io/v1alpha1
kind: Component
resources:
- rbac.yaml
File renamed without changes.
9 changes: 6 additions & 3 deletions config/production/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
resources:
- ../base
# uncommment to enable in-place scaling
# components:
# - ../in-place-scaling
# pod-updater is required if status-labels or in-place-scaling is enabled
components:
- ../pod-updater
- ../status-labels
# uncommment to enable in-place-scaling
# - ../in-place-scaling
images:
- name: installer
newName: ghcr.io/ctrox/zeropod-installer
Expand Down
9 changes: 9 additions & 0 deletions config/status-labels/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: kustomize.config.k8s.io/v1alpha1
kind: Component
patches:
- patch: |-
- op: add
path: /spec/template/spec/containers/0/args/-
value: -status-labels=true
target:
kind: DaemonSet
25 changes: 25 additions & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"net/http"
"runtime"
"strings"
"sync"
"testing"
"time"

v1 "github.com/ctrox/zeropod/api/shim/v1"
"github.com/ctrox/zeropod/manager"
"github.com/ctrox/zeropod/zeropod"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -240,6 +242,29 @@ func TestE2E(t *testing.T) {
}, time.Minute, time.Second)
})

t.Run("status labels", func(t *testing.T) {
pod := testPod(scaleDownAfter(0), agnContainer("agn", 8080), agnContainer("agn2", 8081))

cleanupPod := createPodAndWait(t, ctx, client, pod)
defer cleanupPod()
require.Eventually(t, func() bool {
if err := client.Get(ctx, objectName(pod), pod); err != nil {
return false
}
labelCount := 0
expectedLabels := 2
for k, v := range pod.GetLabels() {
if strings.HasPrefix(k, manager.StatusLabelKeyPrefix) {
if v == v1.ContainerPhase_SCALED_DOWN.String() {
labelCount++
}
}
}

return labelCount == expectedLabels
}, time.Minute, time.Second)
})

t.Run("metrics", func(t *testing.T) {
// create two pods to test metric merging
runningPod := testPod(scaleDownAfter(time.Hour))
Expand Down
40 changes: 40 additions & 0 deletions manager/pod_labeller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package manager

import (
"context"
"log/slog"
"path"

v1 "github.com/ctrox/zeropod/api/shim/v1"
corev1 "k8s.io/api/core/v1"
)

const (
StatusLabelKeyPrefix = "status.zeropod.ctrox.dev"
)

type PodLabeller struct {
log *slog.Logger
}

func NewPodLabeller() *PodLabeller {
log := slog.With("component", "podupdater")
log.Info("init")
return &PodLabeller{log: log}
}

func (pl *PodLabeller) Handle(ctx context.Context, status *v1.ContainerStatus, pod *corev1.Pod) error {
clog := pl.log.With("container", status.Name, "pod", status.PodName,
"namespace", status.PodNamespace, "phase", status.Phase)
clog.Info("status event")

pl.setLabel(pod, status)
return nil
}

func (pu *PodLabeller) setLabel(pod *corev1.Pod, status *v1.ContainerStatus) {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels[path.Join(StatusLabelKeyPrefix, status.Name)] = status.Phase.String()
}
74 changes: 74 additions & 0 deletions manager/pod_labeller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package manager

import (
"context"
"log/slog"
"testing"

v1 "github.com/ctrox/zeropod/api/shim/v1"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)

func TestPodLabeller(t *testing.T) {
slog.SetLogLoggerLevel(slog.LevelDebug)

scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
t.Fatal(err)
}
cases := map[string]struct {
statusEventPhase v1.ContainerPhase
beforeEvent map[string]string
expected map[string]string
}{
"no labels set": {
statusEventPhase: v1.ContainerPhase_RUNNING,
beforeEvent: nil,
expected: map[string]string{
"status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_RUNNING.String(),
},
},
"existing labels are kept": {
statusEventPhase: v1.ContainerPhase_RUNNING,
beforeEvent: map[string]string{"existing": "label"},
expected: map[string]string{
"existing": "label",
"status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_RUNNING.String(),
},
},
"status label is updated": {
statusEventPhase: v1.ContainerPhase_SCALED_DOWN,
beforeEvent: map[string]string{
"status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_RUNNING.String(),
},
expected: map[string]string{
"status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_SCALED_DOWN.String(),
},
},
}

for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
pod := newPod(nil)
pod.SetLabels(tc.beforeEvent)

if err := NewPodLabeller().Handle(
context.Background(),
&v1.ContainerStatus{
Name: pod.Spec.Containers[0].Name,
PodName: pod.Name,
PodNamespace: pod.Namespace,
Phase: tc.statusEventPhase,
},
pod,
); err != nil {
t.Fatal(err)
}

assert.Equal(t, pod.GetLabels(), tc.expected)
})
}
}
42 changes: 4 additions & 38 deletions manager/pod_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ import (

v1 "github.com/ctrox/zeropod/api/shim/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

const (
Expand All @@ -28,33 +24,20 @@ var (
type containerResource map[string]resource.Quantity

type PodScaler struct {
client client.Client
log *slog.Logger
log *slog.Logger
}

func NewPodScaler() (*PodScaler, error) {
func NewPodScaler() *PodScaler {
log := slog.With("component", "podscaler")
log.Info("init")
cfg, err := config.GetConfig()
if err != nil {
return nil, err
}
c, err := client.New(cfg, client.Options{})
return &PodScaler{client: c, log: log}, err
return &PodScaler{log: log}
}

func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) error {
func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus, pod *corev1.Pod) error {
clog := ps.log.With("container", status.Name, "pod", status.PodName,
"namespace", status.PodNamespace, "phase", status.Phase)
clog.Info("status event")

pod := &corev1.Pod{}
podName := types.NamespacedName{Name: status.PodName, Namespace: status.PodNamespace}
if err := ps.client.Get(ctx, podName, pod); err != nil {
return err
}

updatePod := false
for i, container := range pod.Spec.Containers {
if container.Name != status.Name {
continue
Expand Down Expand Up @@ -85,19 +68,6 @@ func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) err
new := ps.newRequests(initial, current, status)
pod.Spec.Containers[i].Resources.Requests = new
clog.Debug("container needs to be updated", "current", printResources(current), "new", printResources(new))
updatePod = true
}

if !updatePod {
return nil
}

if err := ps.updateRequests(ctx, pod); err != nil {
if errors.IsInvalid(err) {
clog.Error("in-place scaling failed, ensure InPlacePodVerticalScaling feature flag is enabled")
return nil
}
return err
}

return nil
Expand Down Expand Up @@ -187,10 +157,6 @@ func (ps *PodScaler) setAnnotations(pod *corev1.Pod) error {
return nil
}

func (ps *PodScaler) updateRequests(ctx context.Context, pod *corev1.Pod) error {
return ps.client.Update(ctx, pod)
}

func printResources(res corev1.ResourceList) string {
cpu := res[corev1.ResourceCPU]
memory := res[corev1.ResourceMemory]
Expand Down
Loading

0 comments on commit a6e2804

Please sign in to comment.