diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 54b5156..1591857 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: run: sudo --preserve-env make test build: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v3 @@ -46,10 +46,20 @@ jobs: with: go-version: "1.22" + - name: Install protoc-gen-go + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.31 + go install github.com/containerd/ttrpc/cmd/protoc-gen-go-ttrpc@v1.2.4 + + - uses: awalsh128/cache-apt-pkgs-action@v1 + with: + packages: protobuf-compiler libprotobuf-dev + version: 1.0 + - name: build ebpf image run: make build-ebpf - - name: generate ebpf + - name: generate ttrpc and ebpf run: make generate - name: check for diff diff --git a/Makefile b/Makefile index 06a1981..2fb96ee 100644 --- a/Makefile +++ b/Makefile @@ -80,9 +80,16 @@ CFLAGS := -O2 -g -Wall -Werror # dependencies installed. generate: export BPF_CLANG := $(CLANG) generate: export BPF_CFLAGS := $(CFLAGS) -generate: +generate: ttrpc docker run --rm -v $(PWD):/app:Z --user $(shell id -u):$(shell id -g) --env=BPF_CLANG="$(CLANG)" --env=BPF_CFLAGS="$(CFLAGS)" $(EBPF_IMAGE) +ttrpc: + go mod download + cd api/shim/v1; protoc --go_out=. --go_opt=paths=source_relative \ + --ttrpc_out=. --plugin=protoc-gen-ttrpc=`which protoc-gen-go-ttrpc` \ + --ttrpc_opt=paths=source_relative *.proto -I. \ + -I $(shell go env GOMODCACHE)/github.com/prometheus/client_model@v0.5.0 + # to improve reproducibility of the bpf builds, we dump the vmlinux.h and # store it compressed in git instead of dumping it during the build. update-vmlinux: diff --git a/README.md b/README.md index c46c452..314d822 100644 --- a/README.md +++ b/README.md @@ -285,13 +285,38 @@ the shim otherwise. For example, loading eBPF programs can be quite memory intensive so they have been moved from the shim to the manager to keep the shim memory usage as minimal as possible. -In addition to that it collects metrics from all the shim processes and -exposes those metrics on an HTTP endpoint. +These are the responsibilities of the manager: + +- Loading eBPF programs that the shim(s) rely on. +- Collect metrics from all shim processes and expose them on HTTP for scraping. +- Subscribes to shim scaling events and adjusts Pod requests. + +#### In-place Resource scaling (Experimental) + +This makes use of the feature flag +[InPlacePodVerticalScaling](https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/1287-in-place-update-pod-resources) +to automatically update the pod resource requests to a minimum on scale down +events and revert them again on scale up. Once the Kubernetes feature flag is +enabled, it also needs to be enabled using the manager flag +`-in-place-scaling=true` plus some additional permissions are required for the +node driver to patch pods. To deploy this, simply uncomment the +`in-place-scaling` component in the `config/production/kustomization.yaml`. +This will add the flag and the required permissions when building the +kustomization. + +#### Flags + +``` +-metrics-addr=":8080" sets the address of the metrics server +-debug enables debug logging +-in-place-scaling=false enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag +``` ## Metrics The zeropod-node pod exposes metrics on `0.0.0.0:8080/metrics` in Prometheus -format on each installed node. The following metrics are currently available: +format on each installed node. The metrics address can be configured with the +`-metrics-addr` flag. The following metrics are currently available: ```bash # HELP zeropod_checkpoint_duration_seconds The duration of the last checkpoint in seconds. diff --git a/api/shim/v1/shim.pb.go b/api/shim/v1/shim.pb.go new file mode 100644 index 0000000..c94d8aa --- /dev/null +++ b/api/shim/v1/shim.pb.go @@ -0,0 +1,527 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v3.21.12 +// source: shim.proto + +package v1 + +import ( + _go "github.com/prometheus/client_model/go" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ContainerPhase int32 + +const ( + ContainerPhase_SCALED_DOWN ContainerPhase = 0 + ContainerPhase_RUNNING ContainerPhase = 1 +) + +// Enum value maps for ContainerPhase. +var ( + ContainerPhase_name = map[int32]string{ + 0: "SCALED_DOWN", + 1: "RUNNING", + } + ContainerPhase_value = map[string]int32{ + "SCALED_DOWN": 0, + "RUNNING": 1, + } +) + +func (x ContainerPhase) Enum() *ContainerPhase { + p := new(ContainerPhase) + *p = x + return p +} + +func (x ContainerPhase) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ContainerPhase) Descriptor() protoreflect.EnumDescriptor { + return file_shim_proto_enumTypes[0].Descriptor() +} + +func (ContainerPhase) Type() protoreflect.EnumType { + return &file_shim_proto_enumTypes[0] +} + +func (x ContainerPhase) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ContainerPhase.Descriptor instead. +func (ContainerPhase) EnumDescriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +type MetricsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *MetricsRequest) Reset() { + *x = MetricsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsRequest) ProtoMessage() {} + +func (x *MetricsRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. +func (*MetricsRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +func (x *MetricsRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + +type SubscribeStatusRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *SubscribeStatusRequest) Reset() { + *x = SubscribeStatusRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeStatusRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeStatusRequest) ProtoMessage() {} + +func (x *SubscribeStatusRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeStatusRequest.ProtoReflect.Descriptor instead. +func (*SubscribeStatusRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{1} +} + +func (x *SubscribeStatusRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + +type MetricsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metrics []*_go.MetricFamily `protobuf:"bytes,1,rep,name=metrics,proto3" json:"metrics,omitempty"` +} + +func (x *MetricsResponse) Reset() { + *x = MetricsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsResponse) ProtoMessage() {} + +func (x *MetricsResponse) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsResponse.ProtoReflect.Descriptor instead. +func (*MetricsResponse) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{2} +} + +func (x *MetricsResponse) GetMetrics() []*_go.MetricFamily { + if x != nil { + return x.Metrics + } + return nil +} + +type ContainerRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *ContainerRequest) Reset() { + *x = ContainerRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ContainerRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ContainerRequest) ProtoMessage() {} + +func (x *ContainerRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ContainerRequest.ProtoReflect.Descriptor instead. +func (*ContainerRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{3} +} + +func (x *ContainerRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type ContainerStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + PodName string `protobuf:"bytes,3,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"` + PodNamespace string `protobuf:"bytes,4,opt,name=pod_namespace,json=podNamespace,proto3" json:"pod_namespace,omitempty"` + Phase ContainerPhase `protobuf:"varint,5,opt,name=phase,proto3,enum=zeropod.shim.v1.ContainerPhase" json:"phase,omitempty"` +} + +func (x *ContainerStatus) Reset() { + *x = ContainerStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ContainerStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ContainerStatus) ProtoMessage() {} + +func (x *ContainerStatus) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ContainerStatus.ProtoReflect.Descriptor instead. +func (*ContainerStatus) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{4} +} + +func (x *ContainerStatus) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ContainerStatus) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ContainerStatus) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + +func (x *ContainerStatus) GetPodNamespace() string { + if x != nil { + return x.PodNamespace + } + return "" +} + +func (x *ContainerStatus) GetPhase() ContainerPhase { + if x != nil { + return x.Phase + } + return ContainerPhase_SCALED_DOWN +} + +var File_shim_proto protoreflect.FileDescriptor + +var file_shim_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x7a, 0x65, + 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, + 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x22, 0x69, 0x6f, 0x2f, 0x70, + 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x3e, + 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x46, + 0x0a, 0x16, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, + 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x4f, 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6f, 0x2e, + 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x07, + 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x22, 0x0a, 0x10, 0x43, 0x6f, 0x6e, 0x74, 0x61, + 0x69, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xac, 0x01, 0x0a, 0x0f, + 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, + 0x0a, 0x0d, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, + 0x61, 0x63, 0x65, 0x12, 0x35, 0x0a, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, + 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x50, 0x68, + 0x61, 0x73, 0x65, 0x52, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x2a, 0x2e, 0x0a, 0x0e, 0x43, 0x6f, + 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x50, 0x68, 0x61, 0x73, 0x65, 0x12, 0x0f, 0x0a, 0x0b, + 0x53, 0x43, 0x41, 0x4c, 0x45, 0x44, 0x5f, 0x44, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, + 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x32, 0x86, 0x02, 0x0a, 0x04, 0x53, + 0x68, 0x69, 0x6d, 0x12, 0x4c, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x1f, + 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, + 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, + 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x50, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, + 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, + 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, + 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x5e, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, + 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, + 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x63, 0x74, 0x72, 0x6f, 0x78, 0x2f, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2f, + 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x69, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x3b, 0x76, 0x31, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_shim_proto_rawDescOnce sync.Once + file_shim_proto_rawDescData = file_shim_proto_rawDesc +) + +func file_shim_proto_rawDescGZIP() []byte { + file_shim_proto_rawDescOnce.Do(func() { + file_shim_proto_rawDescData = protoimpl.X.CompressGZIP(file_shim_proto_rawDescData) + }) + return file_shim_proto_rawDescData +} + +var file_shim_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_shim_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_shim_proto_goTypes = []interface{}{ + (ContainerPhase)(0), // 0: zeropod.shim.v1.ContainerPhase + (*MetricsRequest)(nil), // 1: zeropod.shim.v1.MetricsRequest + (*SubscribeStatusRequest)(nil), // 2: zeropod.shim.v1.SubscribeStatusRequest + (*MetricsResponse)(nil), // 3: zeropod.shim.v1.MetricsResponse + (*ContainerRequest)(nil), // 4: zeropod.shim.v1.ContainerRequest + (*ContainerStatus)(nil), // 5: zeropod.shim.v1.ContainerStatus + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*_go.MetricFamily)(nil), // 7: io.prometheus.client.MetricFamily +} +var file_shim_proto_depIdxs = []int32{ + 6, // 0: zeropod.shim.v1.MetricsRequest.empty:type_name -> google.protobuf.Empty + 6, // 1: zeropod.shim.v1.SubscribeStatusRequest.empty:type_name -> google.protobuf.Empty + 7, // 2: zeropod.shim.v1.MetricsResponse.metrics:type_name -> io.prometheus.client.MetricFamily + 0, // 3: zeropod.shim.v1.ContainerStatus.phase:type_name -> zeropod.shim.v1.ContainerPhase + 1, // 4: zeropod.shim.v1.Shim.Metrics:input_type -> zeropod.shim.v1.MetricsRequest + 4, // 5: zeropod.shim.v1.Shim.GetStatus:input_type -> zeropod.shim.v1.ContainerRequest + 2, // 6: zeropod.shim.v1.Shim.SubscribeStatus:input_type -> zeropod.shim.v1.SubscribeStatusRequest + 3, // 7: zeropod.shim.v1.Shim.Metrics:output_type -> zeropod.shim.v1.MetricsResponse + 5, // 8: zeropod.shim.v1.Shim.GetStatus:output_type -> zeropod.shim.v1.ContainerStatus + 5, // 9: zeropod.shim.v1.Shim.SubscribeStatus:output_type -> zeropod.shim.v1.ContainerStatus + 7, // [7:10] is the sub-list for method output_type + 4, // [4:7] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_shim_proto_init() } +func file_shim_proto_init() { + if File_shim_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_shim_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeStatusRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ContainerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ContainerStatus); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_shim_proto_rawDesc, + NumEnums: 1, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_shim_proto_goTypes, + DependencyIndexes: file_shim_proto_depIdxs, + EnumInfos: file_shim_proto_enumTypes, + MessageInfos: file_shim_proto_msgTypes, + }.Build() + File_shim_proto = out.File + file_shim_proto_rawDesc = nil + file_shim_proto_goTypes = nil + file_shim_proto_depIdxs = nil +} diff --git a/api/shim/v1/shim.proto b/api/shim/v1/shim.proto new file mode 100644 index 0000000..3c2cc33 --- /dev/null +++ b/api/shim/v1/shim.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +package zeropod.shim.v1; +option go_package = "github.com/ctrox/zeropod/api/shim/v1/;v1"; + +import "google/protobuf/empty.proto"; +import "io/prometheus/client/metrics.proto"; + +service Shim { + rpc Metrics(MetricsRequest) returns (MetricsResponse); + rpc GetStatus(ContainerRequest) returns (ContainerStatus); + rpc SubscribeStatus(SubscribeStatusRequest) returns (stream ContainerStatus); +} + +message MetricsRequest { + google.protobuf.Empty empty = 1; +} + +message SubscribeStatusRequest { + google.protobuf.Empty empty = 1; +} + +message MetricsResponse { + repeated io.prometheus.client.MetricFamily metrics = 1; +} + +message ContainerRequest { + string id = 1; +} + +enum ContainerPhase { + SCALED_DOWN = 0; + RUNNING = 1; +} + +message ContainerStatus { + string id = 1; + string name = 2; + string pod_name = 3; + string pod_namespace = 4; + ContainerPhase phase = 5; +} diff --git a/api/shim/v1/shim_ttrpc.pb.go b/api/shim/v1/shim_ttrpc.pb.go new file mode 100644 index 0000000..de3f60d --- /dev/null +++ b/api/shim/v1/shim_ttrpc.pb.go @@ -0,0 +1,122 @@ +// Code generated by protoc-gen-go-ttrpc. DO NOT EDIT. +// source: shim.proto +package v1 + +import ( + context "context" + ttrpc "github.com/containerd/ttrpc" +) + +type ShimService interface { + Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) + GetStatus(context.Context, *ContainerRequest) (*ContainerStatus, error) + SubscribeStatus(context.Context, *SubscribeStatusRequest, Shim_SubscribeStatusServer) error +} + +type Shim_SubscribeStatusServer interface { + Send(*ContainerStatus) error + ttrpc.StreamServer +} + +type shimSubscribeStatusServer struct { + ttrpc.StreamServer +} + +func (x *shimSubscribeStatusServer) Send(m *ContainerStatus) error { + return x.StreamServer.SendMsg(m) +} + +func RegisterShimService(srv *ttrpc.Server, svc ShimService) { + srv.RegisterService("zeropod.shim.v1.Shim", &ttrpc.ServiceDesc{ + Methods: map[string]ttrpc.Method{ + "Metrics": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req MetricsRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Metrics(ctx, &req) + }, + "GetStatus": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ContainerRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.GetStatus(ctx, &req) + }, + }, + Streams: map[string]ttrpc.Stream{ + "SubscribeStatus": { + Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) { + m := new(SubscribeStatusRequest) + if err := stream.RecvMsg(m); err != nil { + return nil, err + } + return nil, svc.SubscribeStatus(ctx, m, &shimSubscribeStatusServer{stream}) + }, + StreamingClient: false, + StreamingServer: true, + }, + }, + }) +} + +type ShimClient interface { + Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) + GetStatus(context.Context, *ContainerRequest) (*ContainerStatus, error) + SubscribeStatus(context.Context, *SubscribeStatusRequest) (Shim_SubscribeStatusClient, error) +} + +type shimClient struct { + client *ttrpc.Client +} + +func NewShimClient(client *ttrpc.Client) ShimClient { + return &shimClient{ + client: client, + } +} + +func (c *shimClient) Metrics(ctx context.Context, req *MetricsRequest) (*MetricsResponse, error) { + var resp MetricsResponse + if err := c.client.Call(ctx, "zeropod.shim.v1.Shim", "Metrics", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) GetStatus(ctx context.Context, req *ContainerRequest) (*ContainerStatus, error) { + var resp ContainerStatus + if err := c.client.Call(ctx, "zeropod.shim.v1.Shim", "GetStatus", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) SubscribeStatus(ctx context.Context, req *SubscribeStatusRequest) (Shim_SubscribeStatusClient, error) { + stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{ + StreamingClient: false, + StreamingServer: true, + }, "zeropod.shim.v1.Shim", "SubscribeStatus", req) + if err != nil { + return nil, err + } + x := &shimSubscribeStatusClient{stream} + return x, nil +} + +type Shim_SubscribeStatusClient interface { + Recv() (*ContainerStatus, error) + ttrpc.ClientStream +} + +type shimSubscribeStatusClient struct { + ttrpc.ClientStream +} + +func (x *shimSubscribeStatusClient) Recv() (*ContainerStatus, error) { + m := new(ContainerStatus) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/cmd/manager/main.go b/cmd/manager/main.go index bec987d..3ef59f4 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -15,11 +15,18 @@ import ( ) var ( - metricsAddr = flag.String("metrics-addr", ":8080", "address of the metrics server") + metricsAddr = flag.String("metrics-addr", ":8080", "address of the metrics server") + debug = flag.Bool("debug", true, "enable debug logs") + inPlaceScaling = flag.Bool("in-place-scaling", false, + "enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag") ) func main() { flag.Parse() + + if *debug { + slog.SetLogLoggerLevel(slog.LevelDebug) + } slog.Info("starting manager", "metrics-addr", *metricsAddr) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -36,6 +43,21 @@ func main() { os.Exit(1) } + subscribers := []manager.StatusHandler{} + if *inPlaceScaling { + podScaler, err := manager.NewPodScaler() + if err != nil { + slog.Error("podScaler init", "err", err) + os.Exit(1) + } + subscribers = append(subscribers, podScaler) + } + + if err := manager.StartSubscribers(ctx, subscribers...); err != nil { + slog.Error("starting subscribers", "err", err) + os.Exit(1) + } + server := &http.Server{Addr: *metricsAddr} http.HandleFunc("/metrics", manager.Handler) diff --git a/config/base/node-daemonset.yaml b/config/base/node-daemonset.yaml index e4a2452..87fad0b 100644 --- a/config/base/node-daemonset.yaml +++ b/config/base/node-daemonset.yaml @@ -52,6 +52,9 @@ spec: - name: manager image: manager imagePullPolicy: IfNotPresent + command: ["/zeropod-manager"] + args: + - -metrics-addr=:8080 ports: - name: metrics containerPort: 8080 diff --git a/config/base/rbac.yaml b/config/base/rbac.yaml index 8ebdd8e..aab8fac 100644 --- a/config/base/rbac.yaml +++ b/config/base/rbac.yaml @@ -8,7 +8,7 @@ metadata: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: runtimeclass-installer + name: zeropod:runtimeclass-installer rules: - apiGroups: - node.k8s.io @@ -22,11 +22,11 @@ rules: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: - name: runtimeclass-installer + name: zeropod:runtimeclass-installer roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: runtimeclass-installer + name: zeropod:runtimeclass-installer subjects: - kind: ServiceAccount name: zeropod-node diff --git a/config/examples/nginx.yaml b/config/examples/nginx.yaml index 25bd61f..62e4804 100644 --- a/config/examples/nginx.yaml +++ b/config/examples/nginx.yaml @@ -3,7 +3,7 @@ kind: Deployment metadata: name: nginx spec: - replicas: 3 + replicas: 1 selector: matchLabels: app: nginx @@ -21,3 +21,7 @@ spec: name: nginx ports: - containerPort: 80 + resources: + requests: + cpu: 100m + memory: 128Mi diff --git a/config/in-place-scaling/kustomization.yaml b/config/in-place-scaling/kustomization.yaml new file mode 100644 index 0000000..70abbf1 --- /dev/null +++ b/config/in-place-scaling/kustomization.yaml @@ -0,0 +1,11 @@ +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - rbac.yaml +patches: + - patch: |- + - op: add + path: /spec/template/spec/containers/0/args/- + value: -in-place-scaling=true + target: + kind: DaemonSet diff --git a/config/in-place-scaling/rbac.yaml b/config/in-place-scaling/rbac.yaml new file mode 100644 index 0000000..0bc9b66 --- /dev/null +++ b/config/in-place-scaling/rbac.yaml @@ -0,0 +1,26 @@ +# the manager needs to get/update pods for dynamic resource requests +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: zeropod:pod-updater +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: zeropod:pod-updater +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: zeropod:pod-updater +subjects: + - kind: ServiceAccount + name: zeropod-node + namespace: zeropod-system diff --git a/config/kind/kustomization.yaml b/config/kind/kustomization.yaml index 559dc9b..eb58447 100644 --- a/config/kind/kustomization.yaml +++ b/config/kind/kustomization.yaml @@ -1,5 +1,7 @@ resources: - ../base +components: + - ../in-place-scaling images: - name: manager newName: ghcr.io/ctrox/zeropod-manager diff --git a/config/production/kustomization.yaml b/config/production/kustomization.yaml index f3848d9..eadd65d 100644 --- a/config/production/kustomization.yaml +++ b/config/production/kustomization.yaml @@ -14,3 +14,6 @@ patches: value: -criu-image=ghcr.io/ctrox/zeropod-criu:v3.19 target: kind: DaemonSet +# uncommment to enable in-place scaling +# components: +# - ../in-place-scaling diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index f4f36df..8e18176 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -9,11 +9,13 @@ import ( "testing" "time" + "github.com/ctrox/zeropod/manager" "github.com/ctrox/zeropod/zeropod" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/ptr" ) @@ -193,6 +195,33 @@ func TestE2E(t *testing.T) { // exec and should test the deletion in the restored state. }) + t.Run("resources scaling", func(t *testing.T) { + pod := testPod(scaleDownAfter(0), agnContainer("agn", 8080), resources(corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + })) + + cleanupPod := createPodAndWait(t, ctx, client, pod) + defer cleanupPod() + require.Eventually(t, func() bool { + if err := client.Get(ctx, objectName(pod), pod); err != nil { + return false + } + + resourcesScaledDown := false + for _, container := range pod.Status.ContainerStatuses { + t.Logf("allocated resources: %v", container.AllocatedResources) + resourcesScaledDown = container.AllocatedResources != nil && + container.AllocatedResources[corev1.ResourceCPU] == manager.ScaledDownCPU && + container.AllocatedResources[corev1.ResourceMemory] == manager.ScaledDownMemory + } + + return resourcesScaledDown + }, time.Second*10, time.Second) + }) + t.Run("metrics", func(t *testing.T) { // create two pods to test metric merging runningPod := testPod(scaleDownAfter(time.Hour)) diff --git a/e2e/kind.yaml b/e2e/kind.yaml index 911779e..8209778 100644 --- a/e2e/kind.yaml +++ b/e2e/kind.yaml @@ -1,5 +1,7 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 +featureGates: + InPlacePodVerticalScaling: true nodes: - role: control-plane extraMounts: diff --git a/e2e/setup_test.go b/e2e/setup_test.go index 0373aa4..4e2c1e4 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -121,6 +121,9 @@ func startKind(t testing.TB, name string, port int) (c *rest.Config, err error) if err := provider.Create(name, cluster.CreateWithV1Alpha4Config(&v1alpha4.Cluster{ Name: name, + FeatureGates: map[string]bool{ + "InPlacePodVerticalScaling": true, + }, Nodes: []v1alpha4.Node{{ Labels: map[string]string{zeropod.NodeLabel: "true"}, // setup port map for our node port @@ -349,6 +352,14 @@ func portsAnnotation(portsMap string) podOption { }) } +func resources(res corev1.ResourceRequirements) podOption { + return func(p *pod) { + for i := range p.Spec.Containers { + p.Spec.Containers[i].Resources = res + } + } +} + const agnHostImage = "registry.k8s.io/e2e-test-images/agnhost:2.39" func agnContainer(name string, port int) podOption { @@ -536,7 +547,7 @@ func restoreCount(t testing.TB, client client.Client, cfg *rest.Config, pod *cor restoreDuration := prometheus.BuildFQName(zeropod.MetricsNamespace, "", zeropod.MetricRestoreDuration) val, ok := mfs[restoreDuration] if !ok { - t.Errorf("could not find expected metric: %s", restoreDuration) + t.Fatalf("could not find expected metric: %s", restoreDuration) } metric, ok := findMetricByLabelMatch(val.Metric, map[string]string{ @@ -544,15 +555,15 @@ func restoreCount(t testing.TB, client client.Client, cfg *rest.Config, pod *cor zeropod.LabelPodNamespace: pod.Namespace, }) if !ok { - t.Errorf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) + t.Fatalf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) } if metric.Histogram == nil { - t.Errorf("found metric that is not a histogram") + t.Fatalf("found metric that is not a histogram") } if metric.Histogram.SampleCount == nil { - t.Errorf("histogram sample count is nil") + t.Fatalf("histogram sample count is nil") } return int(*metric.Histogram.SampleCount) @@ -564,7 +575,7 @@ func checkpointCount(t testing.TB, client client.Client, cfg *rest.Config, pod * checkpointDuration := prometheus.BuildFQName(zeropod.MetricsNamespace, "", zeropod.MetricCheckPointDuration) val, ok := mfs[checkpointDuration] if !ok { - t.Errorf("could not find expected metric: %s", checkpointDuration) + t.Fatalf("could not find expected metric: %s", checkpointDuration) } metric, ok := findMetricByLabelMatch(val.Metric, map[string]string{ @@ -572,15 +583,15 @@ func checkpointCount(t testing.TB, client client.Client, cfg *rest.Config, pod * zeropod.LabelPodNamespace: pod.Namespace, }) if !ok { - t.Errorf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) + t.Fatalf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) } if metric.Histogram == nil { - t.Errorf("found metric that is not a histogram") + t.Fatalf("found metric that is not a histogram") } if metric.Histogram.SampleCount == nil { - t.Errorf("histogram sample count is nil") + t.Fatalf("histogram sample count is nil") } return int(*metric.Histogram.SampleCount) @@ -592,7 +603,7 @@ func isCheckpointed(t testing.TB, client client.Client, cfg *rest.Config, pod *c running := prometheus.BuildFQName(zeropod.MetricsNamespace, "", zeropod.MetricRunning) val, ok := mfs[running] if !ok { - t.Errorf("could not find expected metric: %s", running) + t.Fatalf("could not find expected metric: %s", running) } metric, ok := findMetricByLabelMatch(val.Metric, map[string]string{ @@ -600,15 +611,15 @@ func isCheckpointed(t testing.TB, client client.Client, cfg *rest.Config, pod *c zeropod.LabelPodNamespace: pod.Namespace, }) if !ok { - t.Errorf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) + t.Fatalf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) } if metric.Gauge == nil { - t.Errorf("found metric that is not a gauge") + t.Fatalf("found metric that is not a gauge") } if metric.Gauge.Value == nil { - t.Errorf("gauge value is nil") + t.Fatalf("gauge value is nil") } return *metric.Gauge.Value == 0 && checkpointCount(t, client, cfg, pod) >= 1 diff --git a/go.mod b/go.mod index a51322e..c939e4e 100644 --- a/go.mod +++ b/go.mod @@ -27,8 +27,9 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 github.com/vishvananda/netlink v1.2.1-beta.2 + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f golang.org/x/sys v0.19.0 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.1 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 @@ -58,6 +59,7 @@ require ( github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.2 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.7.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-errors/errors v1.4.2 // indirect @@ -69,7 +71,7 @@ require ( github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -116,7 +118,6 @@ require ( go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect diff --git a/go.sum b/go.sum index 5611955..348374a 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= +github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0nW9SYGc= github.com/evanphx/json-patch/v5 v5.7.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= @@ -135,8 +137,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -485,8 +487,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/manager/metrics_collector.go b/manager/metrics_collector.go index f56bd1f..65988e7 100644 --- a/manager/metrics_collector.go +++ b/manager/metrics_collector.go @@ -1,19 +1,21 @@ package manager import ( - "fmt" + "context" "io" "log/slog" "net" "net/http" "os" "path/filepath" - "sort" + "slices" - "github.com/ctrox/zeropod/zeropod" + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/runc/task" dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" + "golang.org/x/exp/maps" ) func Handler(w http.ResponseWriter, req *http.Request) { @@ -23,42 +25,40 @@ func Handler(w http.ResponseWriter, req *http.Request) { // fetchMetricsAndMerge gets metrics from each socket, merges them together // and writes them to w. func fetchMetricsAndMerge(w io.Writer) { - socks, err := os.ReadDir(zeropod.MetricsSocketPath) + socks, err := os.ReadDir(task.ShimSocketPath) if err != nil { - slog.Error("error listing file in metrics socket path", "path", zeropod.MetricsSocketPath, "err", err) + slog.Error("error listing file in shim socket path", "path", task.ShimSocketPath, "err", err) return } mfs := map[string]*dto.MetricFamily{} for _, sock := range socks { - sockName := filepath.Join(zeropod.MetricsSocketPath, sock.Name()) - slog.Info("reading sock", "name", sockName) + sockName := filepath.Join(task.ShimSocketPath, sock.Name()) + slog.Debug("getting metrics", "name", sockName) - res, err := getMetrics(sockName) + shimMetrics, err := getMetricsOverTTRPC(context.Background(), sockName) if err != nil { slog.Error("getting metrics", "err", err) // we still want to read the rest of the sockets continue } + for _, mf := range shimMetrics { + if mf.Name == nil { + continue + } - for n, mf := range res { - mfo, ok := mfs[n] + mfo, ok := mfs[*mf.Name] if ok { mfo.Metric = append(mfo.Metric, mf.Metric...) } else { - mfs[n] = mf + mfs[*mf.Name] = mf } } } - - names := []string{} - for n := range mfs { - names = append(names, n) - } - sort.Strings(names) - + keys := maps.Keys(mfs) + slices.Sort(keys) enc := expfmt.NewEncoder(w, expfmt.FmtText) - for _, n := range names { + for _, n := range keys { err := enc.Encode(mfs[n]) if err != nil { slog.Error("encoding metrics", "err", err) @@ -67,30 +67,16 @@ func fetchMetricsAndMerge(w io.Writer) { } } -func getMetrics(sock string) (map[string]*dto.MetricFamily, error) { - tr := &http.Transport{ - Dial: func(proto, addr string) (conn net.Conn, err error) { - return net.Dial("unix", sock) - }, - } - - client := &http.Client{Transport: tr} - - // the host does not seem to matter when using unix sockets - resp, err := client.Get("http://localhost/metrics") +func getMetricsOverTTRPC(ctx context.Context, sock string) ([]*dto.MetricFamily, error) { + conn, err := net.Dial("unix", sock) if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("expected status 200, got %v", resp.StatusCode) - } - - var parser expfmt.TextParser - mfs, err := parser.TextToMetricFamilies(resp.Body) + resp, err := v1.NewShimClient(ttrpc.NewClient(conn)).Metrics(ctx, &v1.MetricsRequest{}) if err != nil { return nil, err } - return mfs, nil + return resp.Metrics, nil } diff --git a/manager/pod_scaler.go b/manager/pod_scaler.go new file mode 100644 index 0000000..ae4bf3e --- /dev/null +++ b/manager/pod_scaler.go @@ -0,0 +1,196 @@ +package manager + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +const ( + CPUAnnotationKey = "zeropod.ctrox.dev/cpu-requests" + MemoryAnnotationKey = "zeropod.ctrox.dev/memory-requests" +) + +var ( + ScaledDownCPU = resource.MustParse("1m") + ScaledDownMemory = resource.MustParse("1Ki") +) + +type containerResource map[string]resource.Quantity + +type PodScaler struct { + client client.Client +} + +func NewPodScaler() (*PodScaler, error) { + slog.Info("pod scaler init") + cfg, err := config.GetConfig() + if err != nil { + return nil, err + } + c, err := client.New(cfg, client.Options{}) + return &PodScaler{client: c}, err +} + +func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) error { + clog := slog.With("container", status.Name, "pod", status.PodName, + "namespace", status.PodNamespace, "phase", status.Phase) + clog.Info("handling pod") + + pod := &corev1.Pod{} + podName := types.NamespacedName{Name: status.PodName, Namespace: status.PodNamespace} + if err := ps.client.Get(ctx, podName, pod); err != nil { + return err + } + + updatePod := false + for i, container := range pod.Spec.Containers { + if container.Name != status.Name { + continue + } + + _, hasCPU := container.Resources.Requests[corev1.ResourceCPU] + _, hasMemory := container.Resources.Requests[corev1.ResourceMemory] + if !hasCPU || !hasMemory { + clog.Debug("ignoring container without resources") + continue + } + + initial, err := ps.initialRequests(container, pod.Annotations) + if err != nil { + return fmt.Errorf("getting initial requests from pod failed: %w", err) + } + + current := container.Resources.Requests + if ps.isUpToDate(initial, current, status) { + clog.Debug("container is up to date", "initial", printResources(initial)) + continue + } + + if err := ps.setAnnotations(pod); err != nil { + return err + } + + new := ps.newRequests(initial, current, status) + pod.Spec.Containers[i].Resources.Requests = new + clog.Debug("container needs to be updated", "current", printResources(current), "new", printResources(new)) + updatePod = true + } + + if !updatePod { + return nil + } + + if err := ps.updateRequests(ctx, pod); err != nil { + if errors.IsInvalid(err) { + clog.Error("in-place scaling failed, ensure InPlacePodVerticalScaling feature flag is enabled") + return nil + } + return err + } + + return nil +} + +func (ps *PodScaler) isUpToDate(initial, current corev1.ResourceList, status *v1.ContainerStatus) bool { + switch status.Phase { + case v1.ContainerPhase_SCALED_DOWN: + return current[corev1.ResourceCPU] == ScaledDownCPU && + current[corev1.ResourceMemory] == ScaledDownMemory + case v1.ContainerPhase_RUNNING: + return current[corev1.ResourceCPU] == initial[corev1.ResourceCPU] && + current[corev1.ResourceMemory] == initial[corev1.ResourceMemory] + default: + return true + } +} + +func (ps *PodScaler) newRequests(initial, current corev1.ResourceList, status *v1.ContainerStatus) corev1.ResourceList { + switch status.Phase { + case v1.ContainerPhase_SCALED_DOWN: + current[corev1.ResourceCPU] = ScaledDownCPU + current[corev1.ResourceMemory] = ScaledDownMemory + return current + case v1.ContainerPhase_RUNNING: + return initial + default: + return current + } +} + +func (ps *PodScaler) initialRequests(container corev1.Container, podAnnotations map[string]string) (corev1.ResourceList, error) { + initial := container.DeepCopy().Resources.Requests + containerCPUs := containerResource{} + if cpuReq, ok := podAnnotations[CPUAnnotationKey]; ok { + if err := json.Unmarshal([]byte(cpuReq), &containerCPUs); err != nil { + return nil, err + } + } + + containerMemory := containerResource{} + if memortReq, ok := podAnnotations[MemoryAnnotationKey]; ok { + if err := json.Unmarshal([]byte(memortReq), &containerMemory); err != nil { + return nil, err + } + } + + if cpu, ok := containerCPUs[container.Name]; ok { + initial[corev1.ResourceCPU] = cpu + } + + if memory, ok := containerMemory[container.Name]; ok { + initial[corev1.ResourceMemory] = memory + } + + return initial, nil +} + +func (ps *PodScaler) setAnnotations(pod *corev1.Pod) error { + containerCPUs := containerResource{} + containerMemory := containerResource{} + for _, container := range pod.Spec.Containers { + containerCPUs[container.Name] = container.Resources.Requests[corev1.ResourceCPU] + containerMemory[container.Name] = container.Resources.Requests[corev1.ResourceMemory] + } + + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + + if _, ok := pod.Annotations[CPUAnnotationKey]; !ok { + val, err := json.Marshal(containerCPUs) + if err != nil { + return err + } + pod.Annotations[CPUAnnotationKey] = string(val) + } + + if _, ok := pod.Annotations[MemoryAnnotationKey]; !ok { + val, err := json.Marshal(containerMemory) + if err != nil { + return err + } + pod.Annotations[MemoryAnnotationKey] = string(val) + } + + return nil +} + +func (ps *PodScaler) updateRequests(ctx context.Context, pod *corev1.Pod) error { + return ps.client.Update(ctx, pod) +} + +func printResources(res corev1.ResourceList) string { + cpu := res[corev1.ResourceCPU] + memory := res[corev1.ResourceMemory] + return fmt.Sprintf("cpu: %s, memory: %s", cpu.String(), memory.String()) +} diff --git a/manager/pod_scaler_test.go b/manager/pod_scaler_test.go new file mode 100644 index 0000000..8b1f978 --- /dev/null +++ b/manager/pod_scaler_test.go @@ -0,0 +1,132 @@ +package manager + +import ( + "context" + "log/slog" + "testing" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestHandlePod(t *testing.T) { + slog.SetLogLoggerLevel(slog.LevelDebug) + + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + runningCPU, runningMemory := resource.MustParse("100m"), resource.MustParse("100Mi") + + cases := map[string]struct { + statusEventPhase v1.ContainerPhase + beforeEvent corev1.ResourceList + expected corev1.ResourceList + }{ + "running pod is not updated": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + }, + "running is updated when scaling down": { + statusEventPhase: v1.ContainerPhase_SCALED_DOWN, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + }, + "scaled down pod is not updated": { + statusEventPhase: v1.ContainerPhase_SCALED_DOWN, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + }, + "scaled down pod requests are restored when starting": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme).Build() + ps := &PodScaler{ + client: client, + } + + initialPod := newPod(corev1.ResourceList{corev1.ResourceCPU: runningCPU, corev1.ResourceMemory: runningMemory}) + ps.setAnnotations(initialPod) + pod := newPod(tc.beforeEvent) + pod.SetAnnotations(initialPod.GetAnnotations()) + + ctx := context.Background() + if err := client.Create(ctx, pod); err != nil { + t.Fatal(err) + } + + if err := ps.Handle( + context.Background(), + &v1.ContainerStatus{ + Name: pod.Spec.Containers[0].Name, + PodName: pod.Name, + PodNamespace: pod.Namespace, + Phase: tc.statusEventPhase, + }, + ); err != nil { + t.Fatal(err) + } + + if err := client.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil { + t.Fatal(err) + } + + assert.Equal(t, pod.Spec.Containers[0].Resources.Requests, tc.expected) + }) + } +} + +func newPod(req corev1.ResourceList) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "scaled-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "first-container", + Resources: corev1.ResourceRequirements{ + Requests: req, + }, + }}, + }, + } +} diff --git a/manager/status.go b/manager/status.go new file mode 100644 index 0000000..0279b8c --- /dev/null +++ b/manager/status.go @@ -0,0 +1,115 @@ +package manager + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net" + "os" + "path/filepath" + + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/runc/task" + "github.com/fsnotify/fsnotify" + "google.golang.org/protobuf/types/known/emptypb" +) + +type StatusHandler interface { + Handle(context.Context, *v1.ContainerStatus) error +} + +func StartSubscribers(ctx context.Context, handlers ...StatusHandler) error { + if _, err := os.Stat(task.ShimSocketPath); errors.Is(err, os.ErrNotExist) { + if err := os.Mkdir(task.ShimSocketPath, os.ModePerm); err != nil { + return err + } + } + + socks, err := os.ReadDir(task.ShimSocketPath) + if err != nil { + return fmt.Errorf("error listing file in shim socket path: %s", err) + } + + for _, sock := range socks { + sock := sock + go func() { + if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name()), handlers); err != nil { + slog.Error("error subscribing", "sock", sock.Name(), "err", err) + } + }() + } + + go watchForShims(ctx, handlers) + + return nil +} + +func subscribe(ctx context.Context, sock string, handlers []StatusHandler) error { + log := slog.With("sock", sock) + log.Info("subscribing to status events") + + conn, err := net.Dial("unix", sock) + if err != nil { + return err + } + + shimClient := v1.NewShimClient(ttrpc.NewClient(conn)) + // not sure why but the emptypb needs to be set in order for the subscribe + // to be received + client, err := shimClient.SubscribeStatus(ctx, &v1.SubscribeStatusRequest{Empty: &emptypb.Empty{}}) + if err != nil { + return err + } + + for { + status, err := client.Recv() + if err != nil { + if err == io.EOF || errors.Is(err, ttrpc.ErrClosed) { + log.Info("subscribe closed") + } else { + log.Error("subscribe closed", "err", err) + } + break + } + clog := slog.With("container", status.Name, "pod", status.PodName, + "namespace", status.PodNamespace, "phase", status.Phase) + for _, h := range handlers { + if err := h.Handle(ctx, status); err != nil { + clog.Error("handling status update", "err", err) + } + } + } + + return nil +} + +func watchForShims(ctx context.Context, handlers []StatusHandler) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer watcher.Close() + + if err := watcher.Add(task.ShimSocketPath); err != nil { + return err + } + + for { + select { + case event := <-watcher.Events: + switch event.Op { + case fsnotify.Create: + if err := subscribe(ctx, event.Name, handlers); err != nil { + slog.Error("error subscribing", "sock", event.Name, "err", err) + } + } + case err := <-watcher.Errors: + slog.Error("watch error", "err", err) + case <-ctx.Done(): + return nil + } + } +} diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index 1f022e1..9af5be1 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -8,6 +8,7 @@ import ( "sync" "time" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/zeropod" "google.golang.org/protobuf/types/known/emptypb" @@ -58,8 +59,9 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow } w := &wrapper{ service: s, - zeropodContainers: make(map[string]*zeropod.Container), checkpointRestore: sync.Mutex{}, + zeropodContainers: make(map[string]*zeropod.Container), + zeropodEvents: make(chan *v1.ContainerStatus, 128), } go w.processExits() runcC.Monitor = reaper.Default @@ -80,7 +82,7 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow return shim.RemoveSocket(address) }) - go zeropod.StartMetricsServer(ctx, filepath.Base(address)) + go startShimServer(ctx, filepath.Base(address), w.zeropodEvents) return w, nil } @@ -91,6 +93,7 @@ type wrapper struct { mut sync.Mutex checkpointRestore sync.Mutex zeropodContainers map[string]*zeropod.Container + zeropodEvents chan *v1.ContainerStatus } func (w *wrapper) RegisterTTRPC(server *ttrpc.Server) error { @@ -136,7 +139,7 @@ func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. log.G(ctx).Infof("creating zeropod container: %s", cfg.ContainerName) - zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform) + zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform, w.zeropodEvents) if err != nil { return nil, fmt.Errorf("error creating scaled container: %w", err) } diff --git a/runc/task/shim.go b/runc/task/shim.go new file mode 100644 index 0000000..efc4651 --- /dev/null +++ b/runc/task/shim.go @@ -0,0 +1,113 @@ +package task + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/containerd/containerd/runtime/v2/shim" + "github.com/containerd/log" + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/zeropod" + "github.com/prometheus/client_golang/prometheus" +) + +const ShimSocketPath = "/run/zeropod/s/" + +func shimSocketAddress(id string) string { + return fmt.Sprintf("unix://%s.sock", filepath.Join(ShimSocketPath, id)) +} + +func startShimServer(ctx context.Context, id string, events chan *v1.ContainerStatus) { + socket := shimSocketAddress(id) + listener, err := shim.NewSocket(socket) + if err != nil { + if !shim.SocketEaddrinuse(err) { + log.G(ctx).WithError(err).Error("listening to socket") + return + } + + if shim.CanConnect(socket) { + log.G(ctx).Debug("shim socket already exists, skipping server start") + return + } + + if err := shim.RemoveSocket(socket); err != nil { + log.G(ctx).WithError(err).Error("remove pre-existing socket") + } + + listener, err = shim.NewSocket(socket) + if err != nil { + log.G(ctx).WithError(err).Error("failed to create shim listener") + } + } + + log.G(ctx).Infof("starting shim server at %s", socket) + // write shim address to filesystem + if err := shim.WriteAddress("shim_address", socket); err != nil { + log.G(ctx).WithError(err).Errorf("failed to write shim address") + return + } + + s, err := ttrpc.NewServer() + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to create ttrpc server") + return + } + defer s.Close() + + v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry(), events: events}) + + defer func() { + listener.Close() + os.Remove(socket) + }() + go s.Serve(ctx, listener) + + <-ctx.Done() + + log.G(ctx).Info("stopping shim server") + listener.Close() + s.Close() + _ = os.RemoveAll(socket) +} + +// shimService is an extension to the shim task service to provide +// zeropod-specific functions like metrics. +type shimService struct { + metrics *prometheus.Registry + task wrapper + events chan *v1.ContainerStatus +} + +// SubscribeStatus watches for shim events. +func (s *shimService) SubscribeStatus(ctx context.Context, _ *v1.SubscribeStatusRequest, srv v1.Shim_SubscribeStatusServer) error { + for { + select { + case msg := <-s.events: + if err := srv.Send(msg); err != nil { + log.G(ctx).Errorf("unable to send event message: %s", err) + } + case <-ctx.Done(): + return nil + } + } +} + +// GetStatus returns the status of a zeropod container. +func (s *shimService) GetStatus(ctx context.Context, req *v1.ContainerRequest) (*v1.ContainerStatus, error) { + container, ok := s.task.zeropodContainers[req.Id] + if !ok { + return nil, fmt.Errorf("could not find zeropod container with id: %s", req.Id) + } + + return container.Status(), nil +} + +// Metrics returns metrics of the zeropod shim instance. +func (s *shimService) Metrics(context.Context, *v1.MetricsRequest) (*v1.MetricsResponse, error) { + mfs, err := s.metrics.Gather() + return &v1.MetricsResponse{Metrics: mfs}, err +} diff --git a/zeropod/container.go b/zeropod/container.go index e5c3d1b..4581de5 100644 --- a/zeropod/container.go +++ b/zeropod/container.go @@ -16,6 +16,7 @@ import ( "github.com/containerd/log" "github.com/containernetworking/plugins/pkg/ns" "github.com/ctrox/zeropod/activator" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/socket" ) @@ -38,6 +39,7 @@ type Container struct { tracker socket.Tracker preRestore func() HandleStartedFunc postRestore func(*runc.Container, HandleStartedFunc) + events chan *v1.ContainerStatus // mutex to lock during checkpoint/restore operations since concurrent // restores can cause cgroup confusion. This mutex is shared between all @@ -45,7 +47,7 @@ type Container struct { checkpointRestore *sync.Mutex } -func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Container, pt stdio.Platform) (*Container, error) { +func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Container, pt stdio.Platform, events chan *v1.ContainerStatus) (*Container, error) { p, err := container.Process("") if err != nil { return nil, errdefs.ToGRPC(err) @@ -89,9 +91,11 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta netNS: targetNS, tracker: tracker, checkpointRestore: cr, + events: events, } running.With(c.labels()).Set(1) + c.sendEvent(c.Status()) return c, c.initActivator(ctx) } @@ -159,6 +163,29 @@ func (c *Container) SetScaledDown(scaledDown bool) { running.With(c.labels()).Set(1) lastRestoreTime.With(c.labels()).Set(float64(time.Now().UnixNano())) } + c.sendEvent(c.Status()) +} + +func (c *Container) Status() *v1.ContainerStatus { + phase := v1.ContainerPhase_RUNNING + if c.ScaledDown() { + phase = v1.ContainerPhase_SCALED_DOWN + } + return &v1.ContainerStatus{ + Id: c.ID(), + Name: c.cfg.ContainerName, + PodName: c.cfg.PodName, + PodNamespace: c.cfg.PodNamespace, + Phase: phase, + } +} + +func (c *Container) sendEvent(event *v1.ContainerStatus) { + select { + case c.events <- event: + default: + log.G(c.context).Infof("channel full, discarding event: %v", event) + } } func (c *Container) ScaledDown() bool { diff --git a/zeropod/metrics.go b/zeropod/metrics.go index 3c27341..6a708d4 100644 --- a/zeropod/metrics.go +++ b/zeropod/metrics.go @@ -1,16 +1,7 @@ package zeropod import ( - "context" - "fmt" - "net/http" - "os" - "path/filepath" - - "github.com/containerd/containerd/runtime/v2/shim" - "github.com/containerd/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( @@ -69,66 +60,7 @@ var ( }, commonLabels) ) -const MetricsSocketPath = "/run/zeropod/s/" - -func metricsSocketAddress(containerID string) string { - return fmt.Sprintf("unix://%s.sock", filepath.Join(MetricsSocketPath, containerID)) -} - -func StartMetricsServer(ctx context.Context, containerID string) { - metricsAddress := metricsSocketAddress(containerID) - listener, err := shim.NewSocket(metricsAddress) - if err != nil { - if !shim.SocketEaddrinuse(err) { - log.G(ctx).WithError(err) - return - } - - if shim.CanConnect(metricsAddress) { - log.G(ctx).Debug("metrics socket already exists, skipping server start") - return - } - - if err := shim.RemoveSocket(metricsAddress); err != nil { - log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) - } - - listener, err = shim.NewSocket(metricsAddress) - if err != nil { - log.G(ctx).WithError(err).Error("failed to create metrics listener") - } - } - - log.G(ctx).Infof("starting metrics server at %s", metricsAddress) - // write metrics address to filesystem - if err := shim.WriteAddress("metrics_address", metricsAddress); err != nil { - log.G(ctx).WithError(err).Errorf("failed to write metrics address") - return - } - - mux := http.NewServeMux() - handler := promhttp.HandlerFor( - newRegistry(), - promhttp.HandlerOpts{ - EnableOpenMetrics: false, - }, - ) - - mux.Handle("/metrics", handler) - - server := http.Server{Handler: mux} - - go server.Serve(listener) - - <-ctx.Done() - - log.G(ctx).Info("stopping metrics server") - listener.Close() - server.Close() - _ = os.RemoveAll(metricsAddress) -} - -func newRegistry() *prometheus.Registry { +func NewRegistry() *prometheus.Registry { reg := prometheus.NewRegistry() reg.MustRegister(