diff --git a/README.md b/README.md index 314d822..74cd37b 100644 --- a/README.md +++ b/README.md @@ -304,12 +304,34 @@ node driver to patch pods. To deploy this, simply uncomment the This will add the flag and the required permissions when building the kustomization. +#### Status Labels + +To reflect the container scaling status in the k8s API, the manager can set +status labels on a pod. This requires the flag `-status-labels=true`, which is +set by default in the production deployment. + +The resulting labels have the following structure: + +```yaml +status.zeropod.ctrox.dev/: +``` + +So if our pod has two containers, one of them running and one in scaled-down +state, the labels would be set like this: + +```yaml +labels: + status.zeropod.ctrox.dev/container1: RUNNING + status.zeropod.ctrox.dev/container2: SCALED_DOWN +``` + #### Flags ``` -metrics-addr=":8080" sets the address of the metrics server -debug enables debug logging -in-place-scaling=false enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag +-status-labels=false update pod labels to reflect container status ``` ## Metrics diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 7f5918b..c64a10b 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -19,6 +19,7 @@ var ( debug = flag.Bool("debug", false, "enable debug logs") inPlaceScaling = flag.Bool("in-place-scaling", false, "enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag") + statusLabels = flag.Bool("status-labels", false, "update pod labels to reflect container status") ) func main() { @@ -43,17 +44,15 @@ func main() { os.Exit(1) } - subscribers := []manager.StatusHandler{} + podHandlers := []manager.PodHandler{} + if *statusLabels { + podHandlers = append(podHandlers, manager.NewPodLabeller()) + } if *inPlaceScaling { - podScaler, err := manager.NewPodScaler() - if err != nil { - slog.Error("podScaler init", "err", err) - os.Exit(1) - } - subscribers = append(subscribers, podScaler) + podHandlers = append(podHandlers, manager.NewPodScaler()) } - if err := manager.StartSubscribers(ctx, subscribers...); err != nil { + if err := manager.StartSubscribers(ctx, podHandlers...); err != nil { slog.Error("starting subscribers", "err", err) os.Exit(1) } diff --git a/config/in-place-scaling/kustomization.yaml b/config/in-place-scaling/kustomization.yaml index 70abbf1..cf4363c 100644 --- a/config/in-place-scaling/kustomization.yaml +++ b/config/in-place-scaling/kustomization.yaml @@ -1,7 +1,5 @@ apiVersion: kustomize.config.k8s.io/v1alpha1 kind: Component -resources: - - rbac.yaml patches: - patch: |- - op: add diff --git a/config/kind/kustomization.yaml b/config/kind/kustomization.yaml index 87ce003..b28fdf7 100644 --- a/config/kind/kustomization.yaml +++ b/config/kind/kustomization.yaml @@ -2,6 +2,8 @@ resources: - ../base components: - ../in-place-scaling + - ../pod-updater + - ../status-labels images: - name: manager newName: ghcr.io/ctrox/zeropod-manager diff --git a/config/pod-updater/kustomization.yaml b/config/pod-updater/kustomization.yaml new file mode 100644 index 0000000..b8f594c --- /dev/null +++ b/config/pod-updater/kustomization.yaml @@ -0,0 +1,4 @@ +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - rbac.yaml diff --git a/config/in-place-scaling/rbac.yaml b/config/pod-updater/rbac.yaml similarity index 100% rename from config/in-place-scaling/rbac.yaml rename to config/pod-updater/rbac.yaml diff --git a/config/production/kustomization.yaml b/config/production/kustomization.yaml index b78f029..6a28e6e 100644 --- a/config/production/kustomization.yaml +++ b/config/production/kustomization.yaml @@ -1,8 +1,11 @@ resources: - ../base -# uncommment to enable in-place scaling -# components: -# - ../in-place-scaling +# pod-updater is required if status-labels or in-place-scaling is enabled +components: +- ../pod-updater +- ../status-labels +# uncommment to enable in-place-scaling +# - ../in-place-scaling images: - name: installer newName: ghcr.io/ctrox/zeropod-installer diff --git a/config/status-labels/kustomization.yaml b/config/status-labels/kustomization.yaml new file mode 100644 index 0000000..c330504 --- /dev/null +++ b/config/status-labels/kustomization.yaml @@ -0,0 +1,9 @@ +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +patches: + - patch: |- + - op: add + path: /spec/template/spec/containers/0/args/- + value: -status-labels=true + target: + kind: DaemonSet diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index b4603cc..cb3417b 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -5,10 +5,12 @@ import ( "fmt" "net/http" "runtime" + "strings" "sync" "testing" "time" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/manager" "github.com/ctrox/zeropod/zeropod" "github.com/prometheus/client_golang/prometheus" @@ -240,6 +242,29 @@ func TestE2E(t *testing.T) { }, time.Minute, time.Second) }) + t.Run("status labels", func(t *testing.T) { + pod := testPod(scaleDownAfter(0), agnContainer("agn", 8080), agnContainer("agn2", 8081)) + + cleanupPod := createPodAndWait(t, ctx, client, pod) + defer cleanupPod() + require.Eventually(t, func() bool { + if err := client.Get(ctx, objectName(pod), pod); err != nil { + return false + } + labelCount := 0 + expectedLabels := 2 + for k, v := range pod.GetLabels() { + if strings.HasPrefix(k, manager.StatusLabelKeyPrefix) { + if v == v1.ContainerPhase_SCALED_DOWN.String() { + labelCount++ + } + } + } + + return labelCount == expectedLabels + }, time.Minute, time.Second) + }) + t.Run("metrics", func(t *testing.T) { // create two pods to test metric merging runningPod := testPod(scaleDownAfter(time.Hour)) diff --git a/manager/pod_labeller.go b/manager/pod_labeller.go new file mode 100644 index 0000000..72d6e6f --- /dev/null +++ b/manager/pod_labeller.go @@ -0,0 +1,40 @@ +package manager + +import ( + "context" + "log/slog" + "path" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + corev1 "k8s.io/api/core/v1" +) + +const ( + StatusLabelKeyPrefix = "status.zeropod.ctrox.dev" +) + +type PodLabeller struct { + log *slog.Logger +} + +func NewPodLabeller() *PodLabeller { + log := slog.With("component", "podupdater") + log.Info("init") + return &PodLabeller{log: log} +} + +func (pl *PodLabeller) Handle(ctx context.Context, status *v1.ContainerStatus, pod *corev1.Pod) error { + clog := pl.log.With("container", status.Name, "pod", status.PodName, + "namespace", status.PodNamespace, "phase", status.Phase) + clog.Info("status event") + + pl.setLabel(pod, status) + return nil +} + +func (pu *PodLabeller) setLabel(pod *corev1.Pod, status *v1.ContainerStatus) { + if pod.Labels == nil { + pod.Labels = map[string]string{} + } + pod.Labels[path.Join(StatusLabelKeyPrefix, status.Name)] = status.Phase.String() +} diff --git a/manager/pod_labeller_test.go b/manager/pod_labeller_test.go new file mode 100644 index 0000000..8a400b2 --- /dev/null +++ b/manager/pod_labeller_test.go @@ -0,0 +1,74 @@ +package manager + +import ( + "context" + "log/slog" + "testing" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestPodLabeller(t *testing.T) { + slog.SetLogLoggerLevel(slog.LevelDebug) + + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + cases := map[string]struct { + statusEventPhase v1.ContainerPhase + beforeEvent map[string]string + expected map[string]string + }{ + "no labels set": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: nil, + expected: map[string]string{ + "status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_RUNNING.String(), + }, + }, + "existing labels are kept": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: map[string]string{"existing": "label"}, + expected: map[string]string{ + "existing": "label", + "status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_RUNNING.String(), + }, + }, + "status label is updated": { + statusEventPhase: v1.ContainerPhase_SCALED_DOWN, + beforeEvent: map[string]string{ + "status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_RUNNING.String(), + }, + expected: map[string]string{ + "status.zeropod.ctrox.dev/first-container": v1.ContainerPhase_SCALED_DOWN.String(), + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + pod := newPod(nil) + pod.SetLabels(tc.beforeEvent) + + if err := NewPodLabeller().Handle( + context.Background(), + &v1.ContainerStatus{ + Name: pod.Spec.Containers[0].Name, + PodName: pod.Name, + PodNamespace: pod.Namespace, + Phase: tc.statusEventPhase, + }, + pod, + ); err != nil { + t.Fatal(err) + } + + assert.Equal(t, pod.GetLabels(), tc.expected) + }) + } +} diff --git a/manager/pod_scaler.go b/manager/pod_scaler.go index 675bef8..830fc4f 100644 --- a/manager/pod_scaler.go +++ b/manager/pod_scaler.go @@ -8,11 +8,7 @@ import ( v1 "github.com/ctrox/zeropod/api/shim/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/config" ) const ( @@ -28,33 +24,20 @@ var ( type containerResource map[string]resource.Quantity type PodScaler struct { - client client.Client - log *slog.Logger + log *slog.Logger } -func NewPodScaler() (*PodScaler, error) { +func NewPodScaler() *PodScaler { log := slog.With("component", "podscaler") log.Info("init") - cfg, err := config.GetConfig() - if err != nil { - return nil, err - } - c, err := client.New(cfg, client.Options{}) - return &PodScaler{client: c, log: log}, err + return &PodScaler{log: log} } -func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) error { +func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus, pod *corev1.Pod) error { clog := ps.log.With("container", status.Name, "pod", status.PodName, "namespace", status.PodNamespace, "phase", status.Phase) clog.Info("status event") - pod := &corev1.Pod{} - podName := types.NamespacedName{Name: status.PodName, Namespace: status.PodNamespace} - if err := ps.client.Get(ctx, podName, pod); err != nil { - return err - } - - updatePod := false for i, container := range pod.Spec.Containers { if container.Name != status.Name { continue @@ -85,19 +68,6 @@ func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) err new := ps.newRequests(initial, current, status) pod.Spec.Containers[i].Resources.Requests = new clog.Debug("container needs to be updated", "current", printResources(current), "new", printResources(new)) - updatePod = true - } - - if !updatePod { - return nil - } - - if err := ps.updateRequests(ctx, pod); err != nil { - if errors.IsInvalid(err) { - clog.Error("in-place scaling failed, ensure InPlacePodVerticalScaling feature flag is enabled") - return nil - } - return err } return nil @@ -187,10 +157,6 @@ func (ps *PodScaler) setAnnotations(pod *corev1.Pod) error { return nil } -func (ps *PodScaler) updateRequests(ctx context.Context, pod *corev1.Pod) error { - return ps.client.Update(ctx, pod) -} - func printResources(res corev1.ResourceList) string { cpu := res[corev1.ResourceCPU] memory := res[corev1.ResourceMemory] diff --git a/manager/pod_scaler_test.go b/manager/pod_scaler_test.go index e49dc04..64b8854 100644 --- a/manager/pod_scaler_test.go +++ b/manager/pod_scaler_test.go @@ -11,8 +11,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client/fake" ) func TestHandlePod(t *testing.T) { @@ -78,22 +76,13 @@ func TestHandlePod(t *testing.T) { for name, tc := range cases { tc := tc t.Run(name, func(t *testing.T) { - client := fake.NewClientBuilder().WithScheme(scheme).Build() - ps := &PodScaler{ - client: client, - log: slog.Default(), - } + ps := &PodScaler{log: slog.Default()} initialPod := newPod(corev1.ResourceList{corev1.ResourceCPU: runningCPU, corev1.ResourceMemory: runningMemory}) ps.setAnnotations(initialPod) pod := newPod(tc.beforeEvent) pod.SetAnnotations(initialPod.GetAnnotations()) - ctx := context.Background() - if err := client.Create(ctx, pod); err != nil { - t.Fatal(err) - } - if err := ps.Handle( context.Background(), &v1.ContainerStatus{ @@ -102,14 +91,11 @@ func TestHandlePod(t *testing.T) { PodNamespace: pod.Namespace, Phase: tc.statusEventPhase, }, + pod, ); err != nil { t.Fatal(err) } - if err := client.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil { - t.Fatal(err) - } - assert.Equal(t, pod.Spec.Containers[0].Resources.Requests, tc.expected) }) } @@ -122,12 +108,20 @@ func newPod(req corev1.ResourceList) *corev1.Pod { Namespace: "default", }, Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: "first-container", - Resources: corev1.ResourceRequirements{ - Requests: req, + Containers: []corev1.Container{ + { + Name: "first-container", + Resources: corev1.ResourceRequirements{ + Requests: req, + }, }, - }}, + { + Name: "second-container", + Resources: corev1.ResourceRequirements{ + Requests: req, + }, + }, + }, }, } } diff --git a/manager/status.go b/manager/status.go index 77d2b7d..ea6514e 100644 --- a/manager/status.go +++ b/manager/status.go @@ -1,3 +1,6 @@ +// Package manager contains most of the implementation of the zeropod-manager +// node daemon. It takes care of loading eBPF programs, providing metrics and +// monitors the shims for status updates. package manager import ( @@ -16,8 +19,13 @@ import ( "github.com/ctrox/zeropod/runc/task" "github.com/fsnotify/fsnotify" "google.golang.org/protobuf/types/known/emptypb" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" ) var connectBackoff = wait.Backoff{ @@ -27,11 +35,30 @@ var connectBackoff = wait.Backoff{ Jitter: 0.1, } -type StatusHandler interface { - Handle(context.Context, *v1.ContainerStatus) error +type PodHandler interface { + // Handle a status update with the associated pod. Changes made to + // the pod object will be applied after all handlers have been called. + // Pod status updates are ignored and won't be applied. + Handle(context.Context, *v1.ContainerStatus, *corev1.Pod) error } -func StartSubscribers(ctx context.Context, handlers ...StatusHandler) error { +type subscriber struct { + log *slog.Logger + kube client.Client + subscribeClient v1.Shim_SubscribeStatusClient + podHandlers []PodHandler +} + +func StartSubscribers(ctx context.Context, podHandlers ...PodHandler) error { + cfg, err := config.GetConfig() + if err != nil { + return fmt.Errorf("getting client config: %w", err) + } + kube, err := client.New(cfg, client.Options{}) + if err != nil { + return fmt.Errorf("creating client: %w", err) + } + if _, err := os.Stat(task.ShimSocketPath); errors.Is(err, os.ErrNotExist) { if err := os.Mkdir(task.ShimSocketPath, os.ModePerm); err != nil { return err @@ -46,21 +73,39 @@ func StartSubscribers(ctx context.Context, handlers ...StatusHandler) error { for _, sock := range socks { sock := sock go func() { - if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name()), handlers); err != nil { + if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name()), kube, podHandlers); err != nil { slog.Error("error subscribing", "sock", sock.Name(), "err", err) } }() } - go watchForShims(ctx, handlers) + go watchForShims(ctx, kube, podHandlers) return nil } -func subscribe(ctx context.Context, sock string, handlers []StatusHandler) error { - log := slog.With("sock", sock) - log.Info("subscribing to status events") +func subscribe(ctx context.Context, sock string, kube client.Client, handlers []PodHandler) error { + slog.With("sock", sock).Info("subscribing to status events") + shimClient, err := newShimClient(ctx, sock) + if err != nil { + return err + } + // not sure why but the emptypb needs to be set in order for the subscribe to be received + subscribeClient, err := shimClient.SubscribeStatus(ctx, &v1.SubscribeStatusRequest{Empty: &emptypb.Empty{}}) + if err != nil { + return err + } + + s := subscriber{ + log: slog.With("sock", sock), + kube: kube, + subscribeClient: subscribeClient, + podHandlers: handlers, + } + return s.receive(ctx) +} +func newShimClient(ctx context.Context, sock string) (v1.ShimClient, error) { var conn net.Conn // the socket file might exist but it can take bit until the server is // listening. We retry with a backoff. @@ -80,40 +125,67 @@ func subscribe(ctx context.Context, sock string, handlers []StatusHandler) error return nil }, ); err != nil { - return err + return nil, err } - shimClient := v1.NewShimClient(ttrpc.NewClient(conn)) - // not sure why but the emptypb needs to be set in order for the subscribe - // to be received - client, err := shimClient.SubscribeStatus(ctx, &v1.SubscribeStatusRequest{Empty: &emptypb.Empty{}}) - if err != nil { - return err - } + return v1.NewShimClient(ttrpc.NewClient(conn)), nil +} +func (s *subscriber) receive(ctx context.Context) error { for { - status, err := client.Recv() + status, err := s.subscribeClient.Recv() if err != nil { if err == io.EOF || errors.Is(err, ttrpc.ErrClosed) { - log.Info("subscribe closed") + s.log.Info("subscribe closed") } else { - log.Error("subscribe closed", "err", err) + s.log.Error("subscribe closed", "err", err) } break } clog := slog.With("container", status.Name, "pod", status.PodName, "namespace", status.PodNamespace, "phase", status.Phase) - for _, h := range handlers { - if err := h.Handle(ctx, status); err != nil { - clog.Error("handling status update", "err", err) - } + if err := s.onStatus(ctx, status); err != nil { + clog.Error("handling status update", "err", err) + } + } + + return nil +} + +func (s *subscriber) onStatus(ctx context.Context, status *v1.ContainerStatus) error { + if len(s.podHandlers) > 0 { + if err := s.handlePod(ctx, status); err != nil { + return err + } + } + return nil +} + +func (s *subscriber) handlePod(ctx context.Context, status *v1.ContainerStatus) error { + pod := &corev1.Pod{} + podName := types.NamespacedName{Name: status.PodName, Namespace: status.PodNamespace} + if err := s.kube.Get(ctx, podName, pod); err != nil { + return fmt.Errorf("getting pod: %w", err) + } + + for _, p := range s.podHandlers { + if err := p.Handle(ctx, status, pod); err != nil { + return err } } + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + return s.kube.Update(ctx, pod) + }); err != nil { + if apierrors.IsInvalid(err) { + s.log.Error("in-place scaling failed, ensure InPlacePodVerticalScaling feature flag is enabled") + } + return err + } return nil } -func watchForShims(ctx context.Context, handlers []StatusHandler) error { +func watchForShims(ctx context.Context, kube client.Client, podHandlers []PodHandler) error { watcher, err := fsnotify.NewWatcher() if err != nil { return err @@ -130,7 +202,7 @@ func watchForShims(ctx context.Context, handlers []StatusHandler) error { switch event.Op { case fsnotify.Create: go func() { - if err := subscribe(ctx, event.Name, handlers); err != nil { + if err := subscribe(ctx, event.Name, kube, podHandlers); err != nil { slog.Error("error subscribing", "sock", event.Name, "err", err) } }() diff --git a/manager/status_test.go b/manager/status_test.go new file mode 100644 index 0000000..f30218c --- /dev/null +++ b/manager/status_test.go @@ -0,0 +1,77 @@ +package manager + +import ( + "context" + "log/slog" + "testing" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +type annotationHandler struct { + annotations map[string]string +} + +func (fh *annotationHandler) Handle(ctx context.Context, status *v1.ContainerStatus, pod *corev1.Pod) error { + pod.SetAnnotations(fh.annotations) + return nil +} + +func TestOnStatus(t *testing.T) { + slog.SetLogLoggerLevel(slog.LevelDebug) + + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + cases := map[string]struct { + statusEventPhase v1.ContainerPhase + beforeEvent map[string]string + expected map[string]string + podHandlers []PodHandler + }{ + "pod is updated when we have a pod handler": { + statusEventPhase: v1.ContainerPhase_RUNNING, + podHandlers: []PodHandler{&annotationHandler{annotations: map[string]string{"new": "annotation"}}}, + beforeEvent: map[string]string{"some": "annotation"}, + expected: map[string]string{"new": "annotation"}, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme).Build() + pod := newPod(nil) + pod.SetAnnotations(tc.beforeEvent) + + ctx := context.Background() + if err := client.Create(ctx, pod); err != nil { + t.Fatal(err) + } + + sub := subscriber{ + kube: client, + log: slog.Default(), + podHandlers: tc.podHandlers, + } + assert.NoError(t, sub.onStatus(ctx, &v1.ContainerStatus{ + Name: pod.Spec.Containers[0].Name, + PodName: pod.Name, + PodNamespace: pod.Namespace, + Phase: tc.statusEventPhase, + })) + + if err := client.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil { + t.Fatal(err) + } + + assert.Equal(t, pod.GetAnnotations(), tc.expected) + }) + } +}