From 5a18258fb73b623778290527c597186dc018e01b Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 12 May 2024 14:32:13 +0200 Subject: [PATCH 1/6] feat: use ttrpc for shim metrics Instead of serving the metrics over HTTP, we now use ttrpc and implement a more generic "shim service" that can be used for different purposes than just metrics. --- Makefile | 6 + api/shim/v1/shim.pb.go | 228 +++++++++++++++++++++++++++++++++++ api/shim/v1/shim.proto | 19 +++ api/shim/v1/shim_ttrpc.pb.go | 44 +++++++ go.mod | 6 +- go.sum | 8 +- manager/metrics_collector.go | 62 ++++------ runc/task/service_zeropod.go | 2 +- runc/task/shim.go | 86 +++++++++++++ zeropod/metrics.go | 70 +---------- 10 files changed, 416 insertions(+), 115 deletions(-) create mode 100644 api/shim/v1/shim.pb.go create mode 100644 api/shim/v1/shim.proto create mode 100644 api/shim/v1/shim_ttrpc.pb.go create mode 100644 runc/task/shim.go diff --git a/Makefile b/Makefile index 06a1981..4d88d3f 100644 --- a/Makefile +++ b/Makefile @@ -83,6 +83,12 @@ generate: export BPF_CFLAGS := $(CFLAGS) generate: docker run --rm -v $(PWD):/app:Z --user $(shell id -u):$(shell id -g) --env=BPF_CLANG="$(CLANG)" --env=BPF_CFLAGS="$(CFLAGS)" $(EBPF_IMAGE) +ttrpc: + cd api/shim/v1; protoc --go_out=. --go_opt=paths=source_relative \ + --ttrpc_out=. --plugin=protoc-gen-ttrpc=`which protoc-gen-go-ttrpc` \ + --ttrpc_opt=paths=source_relative *.proto -I. \ + -I ${GOPATH}/pkg/mod/github.com/prometheus/client_model@v0.5.0 + # to improve reproducibility of the bpf builds, we dump the vmlinux.h and # store it compressed in git instead of dumping it during the build. update-vmlinux: diff --git a/api/shim/v1/shim.pb.go b/api/shim/v1/shim.pb.go new file mode 100644 index 0000000..c6ffd17 --- /dev/null +++ b/api/shim/v1/shim.pb.go @@ -0,0 +1,228 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v3.19.6 +// source: shim.proto + +package v1 + +import ( + _go "github.com/prometheus/client_model/go" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type MetricsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metrics []*_go.MetricFamily `protobuf:"bytes,1,rep,name=metrics,proto3" json:"metrics,omitempty"` +} + +func (x *MetricsResponse) Reset() { + *x = MetricsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsResponse) ProtoMessage() {} + +func (x *MetricsResponse) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsResponse.ProtoReflect.Descriptor instead. +func (*MetricsResponse) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +func (x *MetricsResponse) GetMetrics() []*_go.MetricFamily { + if x != nil { + return x.Metrics + } + return nil +} + +type MetricsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *MetricsRequest) Reset() { + *x = MetricsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsRequest) ProtoMessage() {} + +func (x *MetricsRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. +func (*MetricsRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{1} +} + +func (x *MetricsRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + +var File_shim_proto protoreflect.FileDescriptor + +var file_shim_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x7a, 0x65, + 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, + 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x22, 0x69, 0x6f, 0x2f, 0x70, + 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4f, + 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, + 0x75, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, + 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, + 0x3e, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x32, + 0x54, 0x0a, 0x04, 0x53, 0x68, 0x69, 0x6d, 0x12, 0x4c, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, + 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, + 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, + 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x74, 0x72, 0x6f, 0x78, 0x2f, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, + 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x69, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x3b, 0x76, + 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_shim_proto_rawDescOnce sync.Once + file_shim_proto_rawDescData = file_shim_proto_rawDesc +) + +func file_shim_proto_rawDescGZIP() []byte { + file_shim_proto_rawDescOnce.Do(func() { + file_shim_proto_rawDescData = protoimpl.X.CompressGZIP(file_shim_proto_rawDescData) + }) + return file_shim_proto_rawDescData +} + +var file_shim_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_shim_proto_goTypes = []interface{}{ + (*MetricsResponse)(nil), // 0: zeropod.shim.v1.MetricsResponse + (*MetricsRequest)(nil), // 1: zeropod.shim.v1.MetricsRequest + (*_go.MetricFamily)(nil), // 2: io.prometheus.client.MetricFamily + (*emptypb.Empty)(nil), // 3: google.protobuf.Empty +} +var file_shim_proto_depIdxs = []int32{ + 2, // 0: zeropod.shim.v1.MetricsResponse.metrics:type_name -> io.prometheus.client.MetricFamily + 3, // 1: zeropod.shim.v1.MetricsRequest.empty:type_name -> google.protobuf.Empty + 1, // 2: zeropod.shim.v1.Shim.Metrics:input_type -> zeropod.shim.v1.MetricsRequest + 0, // 3: zeropod.shim.v1.Shim.Metrics:output_type -> zeropod.shim.v1.MetricsResponse + 3, // [3:4] is the sub-list for method output_type + 2, // [2:3] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_shim_proto_init() } +func file_shim_proto_init() { + if File_shim_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_shim_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_shim_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_shim_proto_goTypes, + DependencyIndexes: file_shim_proto_depIdxs, + MessageInfos: file_shim_proto_msgTypes, + }.Build() + File_shim_proto = out.File + file_shim_proto_rawDesc = nil + file_shim_proto_goTypes = nil + file_shim_proto_depIdxs = nil +} diff --git a/api/shim/v1/shim.proto b/api/shim/v1/shim.proto new file mode 100644 index 0000000..f46989f --- /dev/null +++ b/api/shim/v1/shim.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package zeropod.shim.v1; +option go_package = "github.com/ctrox/zeropod/api/shim/v1/;v1"; + +import "google/protobuf/empty.proto"; +import "io/prometheus/client/metrics.proto"; + +service Shim { + rpc Metrics(MetricsRequest) returns (MetricsResponse); +} + +message MetricsResponse { + repeated io.prometheus.client.MetricFamily metrics = 1; +} + +message MetricsRequest { + google.protobuf.Empty empty = 1; +} diff --git a/api/shim/v1/shim_ttrpc.pb.go b/api/shim/v1/shim_ttrpc.pb.go new file mode 100644 index 0000000..76a8f7e --- /dev/null +++ b/api/shim/v1/shim_ttrpc.pb.go @@ -0,0 +1,44 @@ +// Code generated by protoc-gen-go-ttrpc. DO NOT EDIT. +// source: shim.proto +package v1 + +import ( + context "context" + ttrpc "github.com/containerd/ttrpc" +) + +type ShimService interface { + Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) +} + +func RegisterShimService(srv *ttrpc.Server, svc ShimService) { + srv.RegisterService("zeropod.shim.v1.Shim", &ttrpc.ServiceDesc{ + Methods: map[string]ttrpc.Method{ + "Metrics": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req MetricsRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Metrics(ctx, &req) + }, + }, + }) +} + +type shimClient struct { + client *ttrpc.Client +} + +func NewShimClient(client *ttrpc.Client) ShimService { + return &shimClient{ + client: client, + } +} + +func (c *shimClient) Metrics(ctx context.Context, req *MetricsRequest) (*MetricsResponse, error) { + var resp MetricsResponse + if err := c.client.Call(ctx, "zeropod.shim.v1.Shim", "Metrics", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} diff --git a/go.mod b/go.mod index a51322e..8cc6cf4 100644 --- a/go.mod +++ b/go.mod @@ -27,8 +27,9 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 github.com/vishvananda/netlink v1.2.1-beta.2 + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f golang.org/x/sys v0.19.0 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.1 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 @@ -69,7 +70,7 @@ require ( github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -116,7 +117,6 @@ require ( go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect - golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect diff --git a/go.sum b/go.sum index 5611955..f678aad 100644 --- a/go.sum +++ b/go.sum @@ -135,8 +135,8 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -485,8 +485,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/manager/metrics_collector.go b/manager/metrics_collector.go index f56bd1f..65988e7 100644 --- a/manager/metrics_collector.go +++ b/manager/metrics_collector.go @@ -1,19 +1,21 @@ package manager import ( - "fmt" + "context" "io" "log/slog" "net" "net/http" "os" "path/filepath" - "sort" + "slices" - "github.com/ctrox/zeropod/zeropod" + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/runc/task" dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" + "golang.org/x/exp/maps" ) func Handler(w http.ResponseWriter, req *http.Request) { @@ -23,42 +25,40 @@ func Handler(w http.ResponseWriter, req *http.Request) { // fetchMetricsAndMerge gets metrics from each socket, merges them together // and writes them to w. func fetchMetricsAndMerge(w io.Writer) { - socks, err := os.ReadDir(zeropod.MetricsSocketPath) + socks, err := os.ReadDir(task.ShimSocketPath) if err != nil { - slog.Error("error listing file in metrics socket path", "path", zeropod.MetricsSocketPath, "err", err) + slog.Error("error listing file in shim socket path", "path", task.ShimSocketPath, "err", err) return } mfs := map[string]*dto.MetricFamily{} for _, sock := range socks { - sockName := filepath.Join(zeropod.MetricsSocketPath, sock.Name()) - slog.Info("reading sock", "name", sockName) + sockName := filepath.Join(task.ShimSocketPath, sock.Name()) + slog.Debug("getting metrics", "name", sockName) - res, err := getMetrics(sockName) + shimMetrics, err := getMetricsOverTTRPC(context.Background(), sockName) if err != nil { slog.Error("getting metrics", "err", err) // we still want to read the rest of the sockets continue } + for _, mf := range shimMetrics { + if mf.Name == nil { + continue + } - for n, mf := range res { - mfo, ok := mfs[n] + mfo, ok := mfs[*mf.Name] if ok { mfo.Metric = append(mfo.Metric, mf.Metric...) } else { - mfs[n] = mf + mfs[*mf.Name] = mf } } } - - names := []string{} - for n := range mfs { - names = append(names, n) - } - sort.Strings(names) - + keys := maps.Keys(mfs) + slices.Sort(keys) enc := expfmt.NewEncoder(w, expfmt.FmtText) - for _, n := range names { + for _, n := range keys { err := enc.Encode(mfs[n]) if err != nil { slog.Error("encoding metrics", "err", err) @@ -67,30 +67,16 @@ func fetchMetricsAndMerge(w io.Writer) { } } -func getMetrics(sock string) (map[string]*dto.MetricFamily, error) { - tr := &http.Transport{ - Dial: func(proto, addr string) (conn net.Conn, err error) { - return net.Dial("unix", sock) - }, - } - - client := &http.Client{Transport: tr} - - // the host does not seem to matter when using unix sockets - resp, err := client.Get("http://localhost/metrics") +func getMetricsOverTTRPC(ctx context.Context, sock string) ([]*dto.MetricFamily, error) { + conn, err := net.Dial("unix", sock) if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("expected status 200, got %v", resp.StatusCode) - } - - var parser expfmt.TextParser - mfs, err := parser.TextToMetricFamilies(resp.Body) + resp, err := v1.NewShimClient(ttrpc.NewClient(conn)).Metrics(ctx, &v1.MetricsRequest{}) if err != nil { return nil, err } - return mfs, nil + return resp.Metrics, nil } diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index 1f022e1..fadd71b 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -80,7 +80,7 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow return shim.RemoveSocket(address) }) - go zeropod.StartMetricsServer(ctx, filepath.Base(address)) + go startShimServer(ctx, filepath.Base(address)) return w, nil } diff --git a/runc/task/shim.go b/runc/task/shim.go new file mode 100644 index 0000000..4391ebd --- /dev/null +++ b/runc/task/shim.go @@ -0,0 +1,86 @@ +package task + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/containerd/containerd/runtime/v2/shim" + "github.com/containerd/log" + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/zeropod" + "github.com/prometheus/client_golang/prometheus" +) + +const ShimSocketPath = "/run/zeropod/s/" + +func shimSocketAddress(id string) string { + return fmt.Sprintf("unix://%s.sock", filepath.Join(ShimSocketPath, id)) +} + +func startShimServer(ctx context.Context, id string) { + socket := shimSocketAddress(id) + listener, err := shim.NewSocket(socket) + if err != nil { + if !shim.SocketEaddrinuse(err) { + log.G(ctx).WithError(err) + return + } + + if shim.CanConnect(socket) { + log.G(ctx).Debug("shim socket already exists, skipping server start") + return + } + + if err := shim.RemoveSocket(socket); err != nil { + log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) + } + + listener, err = shim.NewSocket(socket) + if err != nil { + log.G(ctx).WithError(err).Error("failed to create shim listener") + } + } + + log.G(ctx).Infof("starting shim server at %s", socket) + // write shim address to filesystem + if err := shim.WriteAddress("shim_address", socket); err != nil { + log.G(ctx).WithError(err).Errorf("failed to write shim address") + return + } + + s, err := ttrpc.NewServer() + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to create ttrpc server") + return + } + defer s.Close() + v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry()}) + + defer func() { + listener.Close() + os.Remove(socket) + }() + go s.Serve(ctx, listener) + + <-ctx.Done() + + log.G(ctx).Info("stopping metrics server") + listener.Close() + s.Close() + _ = os.RemoveAll(socket) +} + +// shimService is an extension to the shim task service to provide +// zeropod-specific functions like metrics. +type shimService struct { + metrics *prometheus.Registry +} + +// Metrics implements v1.ShimService. +func (s *shimService) Metrics(context.Context, *v1.MetricsRequest) (*v1.MetricsResponse, error) { + mfs, err := s.metrics.Gather() + return &v1.MetricsResponse{Metrics: mfs}, err +} diff --git a/zeropod/metrics.go b/zeropod/metrics.go index 3c27341..6a708d4 100644 --- a/zeropod/metrics.go +++ b/zeropod/metrics.go @@ -1,16 +1,7 @@ package zeropod import ( - "context" - "fmt" - "net/http" - "os" - "path/filepath" - - "github.com/containerd/containerd/runtime/v2/shim" - "github.com/containerd/log" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) const ( @@ -69,66 +60,7 @@ var ( }, commonLabels) ) -const MetricsSocketPath = "/run/zeropod/s/" - -func metricsSocketAddress(containerID string) string { - return fmt.Sprintf("unix://%s.sock", filepath.Join(MetricsSocketPath, containerID)) -} - -func StartMetricsServer(ctx context.Context, containerID string) { - metricsAddress := metricsSocketAddress(containerID) - listener, err := shim.NewSocket(metricsAddress) - if err != nil { - if !shim.SocketEaddrinuse(err) { - log.G(ctx).WithError(err) - return - } - - if shim.CanConnect(metricsAddress) { - log.G(ctx).Debug("metrics socket already exists, skipping server start") - return - } - - if err := shim.RemoveSocket(metricsAddress); err != nil { - log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) - } - - listener, err = shim.NewSocket(metricsAddress) - if err != nil { - log.G(ctx).WithError(err).Error("failed to create metrics listener") - } - } - - log.G(ctx).Infof("starting metrics server at %s", metricsAddress) - // write metrics address to filesystem - if err := shim.WriteAddress("metrics_address", metricsAddress); err != nil { - log.G(ctx).WithError(err).Errorf("failed to write metrics address") - return - } - - mux := http.NewServeMux() - handler := promhttp.HandlerFor( - newRegistry(), - promhttp.HandlerOpts{ - EnableOpenMetrics: false, - }, - ) - - mux.Handle("/metrics", handler) - - server := http.Server{Handler: mux} - - go server.Serve(listener) - - <-ctx.Done() - - log.G(ctx).Info("stopping metrics server") - listener.Close() - server.Close() - _ = os.RemoveAll(metricsAddress) -} - -func newRegistry() *prometheus.Registry { +func NewRegistry() *prometheus.Registry { reg := prometheus.NewRegistry() reg.MustRegister( From 9b6091c696c3daf69978cf98e1c648eb9adb9245 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Mon, 13 May 2024 20:50:26 +0200 Subject: [PATCH 2/6] feat: add status subscription service the shim ttrpc service now exposes a SubscribeStatus function which the manager subscribes to for receiving zeropod status updates, such as when checkpointing and restoring. --- api/shim/v1/shim.pb.go | 409 ++++++++++++++++++++++++++++++----- api/shim/v1/shim.proto | 27 ++- api/shim/v1/shim_ttrpc.pb.go | 80 ++++++- cmd/manager/main.go | 5 + manager/status.go | 101 +++++++++ runc/task/service_zeropod.go | 9 +- runc/task/shim.go | 39 +++- zeropod/container.go | 29 ++- 8 files changed, 631 insertions(+), 68 deletions(-) create mode 100644 manager/status.go diff --git a/api/shim/v1/shim.pb.go b/api/shim/v1/shim.pb.go index c6ffd17..a60b8af 100644 --- a/api/shim/v1/shim.pb.go +++ b/api/shim/v1/shim.pb.go @@ -22,6 +22,146 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type ContainerPhase int32 + +const ( + ContainerPhase_SCALED_DOWN ContainerPhase = 0 + ContainerPhase_RUNNING ContainerPhase = 1 +) + +// Enum value maps for ContainerPhase. +var ( + ContainerPhase_name = map[int32]string{ + 0: "SCALED_DOWN", + 1: "RUNNING", + } + ContainerPhase_value = map[string]int32{ + "SCALED_DOWN": 0, + "RUNNING": 1, + } +) + +func (x ContainerPhase) Enum() *ContainerPhase { + p := new(ContainerPhase) + *p = x + return p +} + +func (x ContainerPhase) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ContainerPhase) Descriptor() protoreflect.EnumDescriptor { + return file_shim_proto_enumTypes[0].Descriptor() +} + +func (ContainerPhase) Type() protoreflect.EnumType { + return &file_shim_proto_enumTypes[0] +} + +func (x ContainerPhase) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ContainerPhase.Descriptor instead. +func (ContainerPhase) EnumDescriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +type MetricsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *MetricsRequest) Reset() { + *x = MetricsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MetricsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsRequest) ProtoMessage() {} + +func (x *MetricsRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. +func (*MetricsRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{0} +} + +func (x *MetricsRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + +type SubscribeStatusRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` +} + +func (x *SubscribeStatusRequest) Reset() { + *x = SubscribeStatusRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeStatusRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeStatusRequest) ProtoMessage() {} + +func (x *SubscribeStatusRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeStatusRequest.ProtoReflect.Descriptor instead. +func (*SubscribeStatusRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{1} +} + +func (x *SubscribeStatusRequest) GetEmpty() *emptypb.Empty { + if x != nil { + return x.Empty + } + return nil +} + type MetricsResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -33,7 +173,7 @@ type MetricsResponse struct { func (x *MetricsResponse) Reset() { *x = MetricsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_shim_proto_msgTypes[0] + mi := &file_shim_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -46,7 +186,7 @@ func (x *MetricsResponse) String() string { func (*MetricsResponse) ProtoMessage() {} func (x *MetricsResponse) ProtoReflect() protoreflect.Message { - mi := &file_shim_proto_msgTypes[0] + mi := &file_shim_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -59,7 +199,7 @@ func (x *MetricsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MetricsResponse.ProtoReflect.Descriptor instead. func (*MetricsResponse) Descriptor() ([]byte, []int) { - return file_shim_proto_rawDescGZIP(), []int{0} + return file_shim_proto_rawDescGZIP(), []int{2} } func (x *MetricsResponse) GetMetrics() []*_go.MetricFamily { @@ -69,31 +209,31 @@ func (x *MetricsResponse) GetMetrics() []*_go.MetricFamily { return nil } -type MetricsRequest struct { +type ContainerRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Empty *emptypb.Empty `protobuf:"bytes,1,opt,name=empty,proto3" json:"empty,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` } -func (x *MetricsRequest) Reset() { - *x = MetricsRequest{} +func (x *ContainerRequest) Reset() { + *x = ContainerRequest{} if protoimpl.UnsafeEnabled { - mi := &file_shim_proto_msgTypes[1] + mi := &file_shim_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *MetricsRequest) String() string { +func (x *ContainerRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*MetricsRequest) ProtoMessage() {} +func (*ContainerRequest) ProtoMessage() {} -func (x *MetricsRequest) ProtoReflect() protoreflect.Message { - mi := &file_shim_proto_msgTypes[1] +func (x *ContainerRequest) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -104,16 +244,95 @@ func (x *MetricsRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use MetricsRequest.ProtoReflect.Descriptor instead. -func (*MetricsRequest) Descriptor() ([]byte, []int) { - return file_shim_proto_rawDescGZIP(), []int{1} +// Deprecated: Use ContainerRequest.ProtoReflect.Descriptor instead. +func (*ContainerRequest) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{3} } -func (x *MetricsRequest) GetEmpty() *emptypb.Empty { +func (x *ContainerRequest) GetId() string { if x != nil { - return x.Empty + return x.Id } - return nil + return "" +} + +type ContainerStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + PodName string `protobuf:"bytes,3,opt,name=pod_name,json=podName,proto3" json:"pod_name,omitempty"` + PodNamespace string `protobuf:"bytes,4,opt,name=pod_namespace,json=podNamespace,proto3" json:"pod_namespace,omitempty"` + Phase ContainerPhase `protobuf:"varint,5,opt,name=phase,proto3,enum=zeropod.shim.v1.ContainerPhase" json:"phase,omitempty"` +} + +func (x *ContainerStatus) Reset() { + *x = ContainerStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_shim_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ContainerStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ContainerStatus) ProtoMessage() {} + +func (x *ContainerStatus) ProtoReflect() protoreflect.Message { + mi := &file_shim_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ContainerStatus.ProtoReflect.Descriptor instead. +func (*ContainerStatus) Descriptor() ([]byte, []int) { + return file_shim_proto_rawDescGZIP(), []int{4} +} + +func (x *ContainerStatus) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ContainerStatus) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ContainerStatus) GetPodName() string { + if x != nil { + return x.PodName + } + return "" +} + +func (x *ContainerStatus) GetPodNamespace() string { + if x != nil { + return x.PodNamespace + } + return "" +} + +func (x *ContainerStatus) GetPhase() ContainerPhase { + if x != nil { + return x.Phase + } + return ContainerPhase_SCALED_DOWN } var File_shim_proto protoreflect.FileDescriptor @@ -124,25 +343,57 @@ var file_shim_proto_rawDesc = []byte{ 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x22, 0x69, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, - 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4f, - 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, - 0x75, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, - 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, - 0x3e, 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x32, - 0x54, 0x0a, 0x04, 0x53, 0x68, 0x69, 0x6d, 0x12, 0x4c, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x73, 0x12, 0x1f, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, - 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, - 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x74, 0x72, 0x6f, 0x78, 0x2f, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, - 0x64, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x69, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x3b, 0x76, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x3e, + 0x0a, 0x0e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x46, + 0x0a, 0x16, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x05, 0x65, 0x6d, 0x70, 0x74, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, + 0x05, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x4f, 0x0a, 0x0f, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x69, 0x6f, 0x2e, + 0x70, 0x72, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x65, 0x75, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, + 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x07, + 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x22, 0x0a, 0x10, 0x43, 0x6f, 0x6e, 0x74, 0x61, + 0x69, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xac, 0x01, 0x0a, 0x0f, + 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, + 0x0a, 0x0d, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, + 0x61, 0x63, 0x65, 0x12, 0x35, 0x0a, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x1f, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, + 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x50, 0x68, + 0x61, 0x73, 0x65, 0x52, 0x05, 0x70, 0x68, 0x61, 0x73, 0x65, 0x2a, 0x2e, 0x0a, 0x0e, 0x43, 0x6f, + 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x50, 0x68, 0x61, 0x73, 0x65, 0x12, 0x0f, 0x0a, 0x0b, + 0x53, 0x43, 0x41, 0x4c, 0x45, 0x44, 0x5f, 0x44, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, + 0x07, 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x32, 0x86, 0x02, 0x0a, 0x04, 0x53, + 0x68, 0x69, 0x6d, 0x12, 0x4c, 0x0a, 0x07, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x1f, + 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, + 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, + 0x31, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x50, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, + 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, + 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, + 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x12, 0x5e, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, + 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x20, 0x2e, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2e, 0x73, 0x68, 0x69, 0x6d, 0x2e, 0x76, + 0x31, 0x2e, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x63, 0x74, 0x72, 0x6f, 0x78, 0x2f, 0x7a, 0x65, 0x72, 0x6f, 0x70, 0x6f, 0x64, 0x2f, + 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x69, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x3b, 0x76, 0x31, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -157,23 +408,34 @@ func file_shim_proto_rawDescGZIP() []byte { return file_shim_proto_rawDescData } -var file_shim_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_shim_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_shim_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_shim_proto_goTypes = []interface{}{ - (*MetricsResponse)(nil), // 0: zeropod.shim.v1.MetricsResponse - (*MetricsRequest)(nil), // 1: zeropod.shim.v1.MetricsRequest - (*_go.MetricFamily)(nil), // 2: io.prometheus.client.MetricFamily - (*emptypb.Empty)(nil), // 3: google.protobuf.Empty + (ContainerPhase)(0), // 0: zeropod.shim.v1.ContainerPhase + (*MetricsRequest)(nil), // 1: zeropod.shim.v1.MetricsRequest + (*SubscribeStatusRequest)(nil), // 2: zeropod.shim.v1.SubscribeStatusRequest + (*MetricsResponse)(nil), // 3: zeropod.shim.v1.MetricsResponse + (*ContainerRequest)(nil), // 4: zeropod.shim.v1.ContainerRequest + (*ContainerStatus)(nil), // 5: zeropod.shim.v1.ContainerStatus + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*_go.MetricFamily)(nil), // 7: io.prometheus.client.MetricFamily } var file_shim_proto_depIdxs = []int32{ - 2, // 0: zeropod.shim.v1.MetricsResponse.metrics:type_name -> io.prometheus.client.MetricFamily - 3, // 1: zeropod.shim.v1.MetricsRequest.empty:type_name -> google.protobuf.Empty - 1, // 2: zeropod.shim.v1.Shim.Metrics:input_type -> zeropod.shim.v1.MetricsRequest - 0, // 3: zeropod.shim.v1.Shim.Metrics:output_type -> zeropod.shim.v1.MetricsResponse - 3, // [3:4] is the sub-list for method output_type - 2, // [2:3] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 6, // 0: zeropod.shim.v1.MetricsRequest.empty:type_name -> google.protobuf.Empty + 6, // 1: zeropod.shim.v1.SubscribeStatusRequest.empty:type_name -> google.protobuf.Empty + 7, // 2: zeropod.shim.v1.MetricsResponse.metrics:type_name -> io.prometheus.client.MetricFamily + 0, // 3: zeropod.shim.v1.ContainerStatus.phase:type_name -> zeropod.shim.v1.ContainerPhase + 1, // 4: zeropod.shim.v1.Shim.Metrics:input_type -> zeropod.shim.v1.MetricsRequest + 4, // 5: zeropod.shim.v1.Shim.GetStatus:input_type -> zeropod.shim.v1.ContainerRequest + 2, // 6: zeropod.shim.v1.Shim.SubscribeStatus:input_type -> zeropod.shim.v1.SubscribeStatusRequest + 3, // 7: zeropod.shim.v1.Shim.Metrics:output_type -> zeropod.shim.v1.MetricsResponse + 5, // 8: zeropod.shim.v1.Shim.GetStatus:output_type -> zeropod.shim.v1.ContainerStatus + 5, // 9: zeropod.shim.v1.Shim.SubscribeStatus:output_type -> zeropod.shim.v1.ContainerStatus + 7, // [7:10] is the sub-list for method output_type + 4, // [4:7] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_shim_proto_init() } @@ -183,7 +445,7 @@ func file_shim_proto_init() { } if !protoimpl.UnsafeEnabled { file_shim_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MetricsResponse); i { + switch v := v.(*MetricsRequest); i { case 0: return &v.state case 1: @@ -195,7 +457,43 @@ func file_shim_proto_init() { } } file_shim_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MetricsRequest); i { + switch v := v.(*SubscribeStatusRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ContainerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shim_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ContainerStatus); i { case 0: return &v.state case 1: @@ -212,13 +510,14 @@ func file_shim_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_shim_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, + NumEnums: 1, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, GoTypes: file_shim_proto_goTypes, DependencyIndexes: file_shim_proto_depIdxs, + EnumInfos: file_shim_proto_enumTypes, MessageInfos: file_shim_proto_msgTypes, }.Build() File_shim_proto = out.File diff --git a/api/shim/v1/shim.proto b/api/shim/v1/shim.proto index f46989f..3c2cc33 100644 --- a/api/shim/v1/shim.proto +++ b/api/shim/v1/shim.proto @@ -8,12 +8,35 @@ import "io/prometheus/client/metrics.proto"; service Shim { rpc Metrics(MetricsRequest) returns (MetricsResponse); + rpc GetStatus(ContainerRequest) returns (ContainerStatus); + rpc SubscribeStatus(SubscribeStatusRequest) returns (stream ContainerStatus); +} + +message MetricsRequest { + google.protobuf.Empty empty = 1; +} + +message SubscribeStatusRequest { + google.protobuf.Empty empty = 1; } message MetricsResponse { repeated io.prometheus.client.MetricFamily metrics = 1; } -message MetricsRequest { - google.protobuf.Empty empty = 1; +message ContainerRequest { + string id = 1; +} + +enum ContainerPhase { + SCALED_DOWN = 0; + RUNNING = 1; +} + +message ContainerStatus { + string id = 1; + string name = 2; + string pod_name = 3; + string pod_namespace = 4; + ContainerPhase phase = 5; } diff --git a/api/shim/v1/shim_ttrpc.pb.go b/api/shim/v1/shim_ttrpc.pb.go index 76a8f7e..de3f60d 100644 --- a/api/shim/v1/shim_ttrpc.pb.go +++ b/api/shim/v1/shim_ttrpc.pb.go @@ -9,6 +9,21 @@ import ( type ShimService interface { Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) + GetStatus(context.Context, *ContainerRequest) (*ContainerStatus, error) + SubscribeStatus(context.Context, *SubscribeStatusRequest, Shim_SubscribeStatusServer) error +} + +type Shim_SubscribeStatusServer interface { + Send(*ContainerStatus) error + ttrpc.StreamServer +} + +type shimSubscribeStatusServer struct { + ttrpc.StreamServer +} + +func (x *shimSubscribeStatusServer) Send(m *ContainerStatus) error { + return x.StreamServer.SendMsg(m) } func RegisterShimService(srv *ttrpc.Server, svc ShimService) { @@ -21,15 +36,41 @@ func RegisterShimService(srv *ttrpc.Server, svc ShimService) { } return svc.Metrics(ctx, &req) }, + "GetStatus": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ContainerRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.GetStatus(ctx, &req) + }, + }, + Streams: map[string]ttrpc.Stream{ + "SubscribeStatus": { + Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) { + m := new(SubscribeStatusRequest) + if err := stream.RecvMsg(m); err != nil { + return nil, err + } + return nil, svc.SubscribeStatus(ctx, m, &shimSubscribeStatusServer{stream}) + }, + StreamingClient: false, + StreamingServer: true, + }, }, }) } +type ShimClient interface { + Metrics(context.Context, *MetricsRequest) (*MetricsResponse, error) + GetStatus(context.Context, *ContainerRequest) (*ContainerStatus, error) + SubscribeStatus(context.Context, *SubscribeStatusRequest) (Shim_SubscribeStatusClient, error) +} + type shimClient struct { client *ttrpc.Client } -func NewShimClient(client *ttrpc.Client) ShimService { +func NewShimClient(client *ttrpc.Client) ShimClient { return &shimClient{ client: client, } @@ -42,3 +83,40 @@ func (c *shimClient) Metrics(ctx context.Context, req *MetricsRequest) (*Metrics } return &resp, nil } + +func (c *shimClient) GetStatus(ctx context.Context, req *ContainerRequest) (*ContainerStatus, error) { + var resp ContainerStatus + if err := c.client.Call(ctx, "zeropod.shim.v1.Shim", "GetStatus", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *shimClient) SubscribeStatus(ctx context.Context, req *SubscribeStatusRequest) (Shim_SubscribeStatusClient, error) { + stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{ + StreamingClient: false, + StreamingServer: true, + }, "zeropod.shim.v1.Shim", "SubscribeStatus", req) + if err != nil { + return nil, err + } + x := &shimSubscribeStatusClient{stream} + return x, nil +} + +type Shim_SubscribeStatusClient interface { + Recv() (*ContainerStatus, error) + ttrpc.ClientStream +} + +type shimSubscribeStatusClient struct { + ttrpc.ClientStream +} + +func (x *shimSubscribeStatusClient) Recv() (*ContainerStatus, error) { + m := new(ContainerStatus) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} diff --git a/cmd/manager/main.go b/cmd/manager/main.go index bec987d..ae36861 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -36,6 +36,11 @@ func main() { os.Exit(1) } + if err := manager.StartSubscribers(ctx); err != nil { + slog.Error("starting subscribers", "err", err) + os.Exit(1) + } + server := &http.Server{Addr: *metricsAddr} http.HandleFunc("/metrics", manager.Handler) diff --git a/manager/status.go b/manager/status.go new file mode 100644 index 0000000..88248af --- /dev/null +++ b/manager/status.go @@ -0,0 +1,101 @@ +package manager + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net" + "os" + "path/filepath" + + "github.com/containerd/ttrpc" + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/ctrox/zeropod/runc/task" + "github.com/fsnotify/fsnotify" + "google.golang.org/protobuf/types/known/emptypb" +) + +func StartSubscribers(ctx context.Context) error { + socks, err := os.ReadDir(task.ShimSocketPath) + if err != nil { + return fmt.Errorf("error listing file in shim socket path: %s", err) + } + + for _, sock := range socks { + sock := sock + go func() { + if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name())); err != nil { + slog.Error("error subscribing", "sock", sock.Name(), "err", err) + } + }() + } + + go watchForShims(ctx) + + return nil +} + +func subscribe(ctx context.Context, sock string) error { + log := slog.With("sock", sock) + log.Info("subscribing to status events") + + conn, err := net.Dial("unix", sock) + if err != nil { + return err + } + + shimClient := v1.NewShimClient(ttrpc.NewClient(conn)) + // not sure why but the emptypb needs to be set in order for the subscribe + // to be received + client, err := shimClient.SubscribeStatus(ctx, &v1.SubscribeStatusRequest{Empty: &emptypb.Empty{}}) + if err != nil { + return err + } + + for { + status, err := client.Recv() + if err != nil { + if err == io.EOF || errors.Is(err, ttrpc.ErrClosed) { + log.Info("subscribe closed") + } else { + log.Error("subscribe closed", "err", err) + } + break + } + slog.Info("received status", + "container", status.Name, "pod", status.PodName, + "namespace", status.PodNamespace, "phase", status.Phase) + } + + return nil +} + +func watchForShims(ctx context.Context) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer watcher.Close() + + if err := watcher.Add(task.ShimSocketPath); err != nil { + return err + } + + for { + select { + case event := <-watcher.Events: + switch event.Op { + case fsnotify.Create: + if err := subscribe(ctx, event.Name); err != nil { + slog.Error("error subscribing", "sock", event.Name, "err", err) + } + } + case err := <-watcher.Errors: + slog.Error("watch error", "err", err) + case <-ctx.Done(): + return nil + } + } +} diff --git a/runc/task/service_zeropod.go b/runc/task/service_zeropod.go index fadd71b..9af5be1 100644 --- a/runc/task/service_zeropod.go +++ b/runc/task/service_zeropod.go @@ -8,6 +8,7 @@ import ( "sync" "time" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/zeropod" "google.golang.org/protobuf/types/known/emptypb" @@ -58,8 +59,9 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow } w := &wrapper{ service: s, - zeropodContainers: make(map[string]*zeropod.Container), checkpointRestore: sync.Mutex{}, + zeropodContainers: make(map[string]*zeropod.Container), + zeropodEvents: make(chan *v1.ContainerStatus, 128), } go w.processExits() runcC.Monitor = reaper.Default @@ -80,7 +82,7 @@ func NewZeropodService(ctx context.Context, publisher shim.Publisher, sd shutdow return shim.RemoveSocket(address) }) - go startShimServer(ctx, filepath.Base(address)) + go startShimServer(ctx, filepath.Base(address), w.zeropodEvents) return w, nil } @@ -91,6 +93,7 @@ type wrapper struct { mut sync.Mutex checkpointRestore sync.Mutex zeropodContainers map[string]*zeropod.Container + zeropodEvents chan *v1.ContainerStatus } func (w *wrapper) RegisterTTRPC(server *ttrpc.Server) error { @@ -136,7 +139,7 @@ func (w *wrapper) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. log.G(ctx).Infof("creating zeropod container: %s", cfg.ContainerName) - zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform) + zeropodContainer, err := zeropod.New(w.context, cfg, &w.checkpointRestore, container, w.platform, w.zeropodEvents) if err != nil { return nil, fmt.Errorf("error creating scaled container: %w", err) } diff --git a/runc/task/shim.go b/runc/task/shim.go index 4391ebd..efc4651 100644 --- a/runc/task/shim.go +++ b/runc/task/shim.go @@ -20,12 +20,12 @@ func shimSocketAddress(id string) string { return fmt.Sprintf("unix://%s.sock", filepath.Join(ShimSocketPath, id)) } -func startShimServer(ctx context.Context, id string) { +func startShimServer(ctx context.Context, id string, events chan *v1.ContainerStatus) { socket := shimSocketAddress(id) listener, err := shim.NewSocket(socket) if err != nil { if !shim.SocketEaddrinuse(err) { - log.G(ctx).WithError(err) + log.G(ctx).WithError(err).Error("listening to socket") return } @@ -35,7 +35,7 @@ func startShimServer(ctx context.Context, id string) { } if err := shim.RemoveSocket(socket); err != nil { - log.G(ctx).WithError(fmt.Errorf("remove pre-existing socket: %w", err)) + log.G(ctx).WithError(err).Error("remove pre-existing socket") } listener, err = shim.NewSocket(socket) @@ -57,7 +57,8 @@ func startShimServer(ctx context.Context, id string) { return } defer s.Close() - v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry()}) + + v1.RegisterShimService(s, &shimService{metrics: zeropod.NewRegistry(), events: events}) defer func() { listener.Close() @@ -67,7 +68,7 @@ func startShimServer(ctx context.Context, id string) { <-ctx.Done() - log.G(ctx).Info("stopping metrics server") + log.G(ctx).Info("stopping shim server") listener.Close() s.Close() _ = os.RemoveAll(socket) @@ -77,9 +78,35 @@ func startShimServer(ctx context.Context, id string) { // zeropod-specific functions like metrics. type shimService struct { metrics *prometheus.Registry + task wrapper + events chan *v1.ContainerStatus +} + +// SubscribeStatus watches for shim events. +func (s *shimService) SubscribeStatus(ctx context.Context, _ *v1.SubscribeStatusRequest, srv v1.Shim_SubscribeStatusServer) error { + for { + select { + case msg := <-s.events: + if err := srv.Send(msg); err != nil { + log.G(ctx).Errorf("unable to send event message: %s", err) + } + case <-ctx.Done(): + return nil + } + } +} + +// GetStatus returns the status of a zeropod container. +func (s *shimService) GetStatus(ctx context.Context, req *v1.ContainerRequest) (*v1.ContainerStatus, error) { + container, ok := s.task.zeropodContainers[req.Id] + if !ok { + return nil, fmt.Errorf("could not find zeropod container with id: %s", req.Id) + } + + return container.Status(), nil } -// Metrics implements v1.ShimService. +// Metrics returns metrics of the zeropod shim instance. func (s *shimService) Metrics(context.Context, *v1.MetricsRequest) (*v1.MetricsResponse, error) { mfs, err := s.metrics.Gather() return &v1.MetricsResponse{Metrics: mfs}, err diff --git a/zeropod/container.go b/zeropod/container.go index e5c3d1b..4581de5 100644 --- a/zeropod/container.go +++ b/zeropod/container.go @@ -16,6 +16,7 @@ import ( "github.com/containerd/log" "github.com/containernetworking/plugins/pkg/ns" "github.com/ctrox/zeropod/activator" + v1 "github.com/ctrox/zeropod/api/shim/v1" "github.com/ctrox/zeropod/socket" ) @@ -38,6 +39,7 @@ type Container struct { tracker socket.Tracker preRestore func() HandleStartedFunc postRestore func(*runc.Container, HandleStartedFunc) + events chan *v1.ContainerStatus // mutex to lock during checkpoint/restore operations since concurrent // restores can cause cgroup confusion. This mutex is shared between all @@ -45,7 +47,7 @@ type Container struct { checkpointRestore *sync.Mutex } -func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Container, pt stdio.Platform) (*Container, error) { +func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Container, pt stdio.Platform, events chan *v1.ContainerStatus) (*Container, error) { p, err := container.Process("") if err != nil { return nil, errdefs.ToGRPC(err) @@ -89,9 +91,11 @@ func New(ctx context.Context, cfg *Config, cr *sync.Mutex, container *runc.Conta netNS: targetNS, tracker: tracker, checkpointRestore: cr, + events: events, } running.With(c.labels()).Set(1) + c.sendEvent(c.Status()) return c, c.initActivator(ctx) } @@ -159,6 +163,29 @@ func (c *Container) SetScaledDown(scaledDown bool) { running.With(c.labels()).Set(1) lastRestoreTime.With(c.labels()).Set(float64(time.Now().UnixNano())) } + c.sendEvent(c.Status()) +} + +func (c *Container) Status() *v1.ContainerStatus { + phase := v1.ContainerPhase_RUNNING + if c.ScaledDown() { + phase = v1.ContainerPhase_SCALED_DOWN + } + return &v1.ContainerStatus{ + Id: c.ID(), + Name: c.cfg.ContainerName, + PodName: c.cfg.PodName, + PodNamespace: c.cfg.PodNamespace, + Phase: phase, + } +} + +func (c *Container) sendEvent(event *v1.ContainerStatus) { + select { + case c.events <- event: + default: + log.G(c.context).Infof("channel full, discarding event: %v", event) + } } func (c *Container) ScaledDown() bool { From 1439719d6add7dd1e49037f84ae05c9aaef041a3 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sat, 25 May 2024 16:25:18 +0200 Subject: [PATCH 3/6] feat: implement in-place pod request scaling If enabled, we make use of the (still alpha) feature gate InPlacePodVerticalScaling to change the pod requests to a minimum on scaling down and increasing them to the original value on restore. This has been marked as experimental and also needs to be enabled by setting a flag on the manager and of course the feature flag needs to be enabled. --- README.md | 27 ++++- cmd/manager/main.go | 21 +++- config/base/node-daemonset.yaml | 3 + config/base/rbac.yaml | 33 +++++- config/examples/nginx.yaml | 6 +- config/kind/kustomization.yaml | 7 ++ e2e/e2e_test.go | 29 +++++ e2e/kind.yaml | 2 + e2e/setup_test.go | 11 ++ go.mod | 1 + go.sum | 2 + manager/pod_scaler.go | 196 ++++++++++++++++++++++++++++++++ manager/pod_scaler_test.go | 132 +++++++++++++++++++++ manager/status.go | 30 +++-- 14 files changed, 483 insertions(+), 17 deletions(-) create mode 100644 manager/pod_scaler.go create mode 100644 manager/pod_scaler_test.go diff --git a/README.md b/README.md index c46c452..2d2cc3a 100644 --- a/README.md +++ b/README.md @@ -285,13 +285,34 @@ the shim otherwise. For example, loading eBPF programs can be quite memory intensive so they have been moved from the shim to the manager to keep the shim memory usage as minimal as possible. -In addition to that it collects metrics from all the shim processes and -exposes those metrics on an HTTP endpoint. +These are the responsibilities of the manager: + +- Loading eBPF programs that the shim(s) rely on. +- Collect metrics from all shim processes and expose them on HTTP for scraping. +- Subscribes to shim scaling events and adjusts Pod requests. + +#### In-place Resource scaling (Experimental) + +This makes use of the feature flag +[InPlacePodVerticalScaling](https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/1287-in-place-update-pod-resources) +to automatically update the pod resource requests to a minimum on scale down +events and revert them again on scale up. Once the Kubernetes feature flag is +enabled, it also needs to be enabled using the manager flag +`-in-place-scaling=true`. + +#### Flags + +``` +-metrics-addr=":8080" sets the address of the metrics server +-debug enables debug logging +-in-place-scaling=false enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag +``` ## Metrics The zeropod-node pod exposes metrics on `0.0.0.0:8080/metrics` in Prometheus -format on each installed node. The following metrics are currently available: +format on each installed node. The metrics address can be configured with the +`-metrics-addr` flag. The following metrics are currently available: ```bash # HELP zeropod_checkpoint_duration_seconds The duration of the last checkpoint in seconds. diff --git a/cmd/manager/main.go b/cmd/manager/main.go index ae36861..3ef59f4 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -15,11 +15,18 @@ import ( ) var ( - metricsAddr = flag.String("metrics-addr", ":8080", "address of the metrics server") + metricsAddr = flag.String("metrics-addr", ":8080", "address of the metrics server") + debug = flag.Bool("debug", true, "enable debug logs") + inPlaceScaling = flag.Bool("in-place-scaling", false, + "enable in-place resource scaling, requires InPlacePodVerticalScaling feature flag") ) func main() { flag.Parse() + + if *debug { + slog.SetLogLoggerLevel(slog.LevelDebug) + } slog.Info("starting manager", "metrics-addr", *metricsAddr) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -36,7 +43,17 @@ func main() { os.Exit(1) } - if err := manager.StartSubscribers(ctx); err != nil { + subscribers := []manager.StatusHandler{} + if *inPlaceScaling { + podScaler, err := manager.NewPodScaler() + if err != nil { + slog.Error("podScaler init", "err", err) + os.Exit(1) + } + subscribers = append(subscribers, podScaler) + } + + if err := manager.StartSubscribers(ctx, subscribers...); err != nil { slog.Error("starting subscribers", "err", err) os.Exit(1) } diff --git a/config/base/node-daemonset.yaml b/config/base/node-daemonset.yaml index e4a2452..87fad0b 100644 --- a/config/base/node-daemonset.yaml +++ b/config/base/node-daemonset.yaml @@ -52,6 +52,9 @@ spec: - name: manager image: manager imagePullPolicy: IfNotPresent + command: ["/zeropod-manager"] + args: + - -metrics-addr=:8080 ports: - name: metrics containerPort: 8080 diff --git a/config/base/rbac.yaml b/config/base/rbac.yaml index 8ebdd8e..59a3958 100644 --- a/config/base/rbac.yaml +++ b/config/base/rbac.yaml @@ -8,7 +8,7 @@ metadata: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: runtimeclass-installer + name: zeropod:runtimeclass-installer rules: - apiGroups: - node.k8s.io @@ -22,11 +22,38 @@ rules: apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: - name: runtimeclass-installer + name: zeropod:runtimeclass-installer roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: runtimeclass-installer + name: zeropod:runtimeclass-installer +subjects: + - kind: ServiceAccount + name: zeropod-node + namespace: zeropod-system +--- +# the manager needs to get/update pods for dynamic resource requests +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: zeropod:pod-updater +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: zeropod:pod-updater +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: zeropod:pod-updater subjects: - kind: ServiceAccount name: zeropod-node diff --git a/config/examples/nginx.yaml b/config/examples/nginx.yaml index 25bd61f..62e4804 100644 --- a/config/examples/nginx.yaml +++ b/config/examples/nginx.yaml @@ -3,7 +3,7 @@ kind: Deployment metadata: name: nginx spec: - replicas: 3 + replicas: 1 selector: matchLabels: app: nginx @@ -21,3 +21,7 @@ spec: name: nginx ports: - containerPort: 80 + resources: + requests: + cpu: 100m + memory: 128Mi diff --git a/config/kind/kustomization.yaml b/config/kind/kustomization.yaml index 559dc9b..ff89457 100644 --- a/config/kind/kustomization.yaml +++ b/config/kind/kustomization.yaml @@ -7,3 +7,10 @@ images: - name: installer newName: ghcr.io/ctrox/zeropod-installer newTag: dev +patches: + - patch: |- + - op: add + path: /spec/template/spec/containers/0/args/- + value: -in-place-scaling=true + target: + kind: DaemonSet diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index f4f36df..8e18176 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -9,11 +9,13 @@ import ( "testing" "time" + "github.com/ctrox/zeropod/manager" "github.com/ctrox/zeropod/zeropod" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/ptr" ) @@ -193,6 +195,33 @@ func TestE2E(t *testing.T) { // exec and should test the deletion in the restored state. }) + t.Run("resources scaling", func(t *testing.T) { + pod := testPod(scaleDownAfter(0), agnContainer("agn", 8080), resources(corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100m"), + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + })) + + cleanupPod := createPodAndWait(t, ctx, client, pod) + defer cleanupPod() + require.Eventually(t, func() bool { + if err := client.Get(ctx, objectName(pod), pod); err != nil { + return false + } + + resourcesScaledDown := false + for _, container := range pod.Status.ContainerStatuses { + t.Logf("allocated resources: %v", container.AllocatedResources) + resourcesScaledDown = container.AllocatedResources != nil && + container.AllocatedResources[corev1.ResourceCPU] == manager.ScaledDownCPU && + container.AllocatedResources[corev1.ResourceMemory] == manager.ScaledDownMemory + } + + return resourcesScaledDown + }, time.Second*10, time.Second) + }) + t.Run("metrics", func(t *testing.T) { // create two pods to test metric merging runningPod := testPod(scaleDownAfter(time.Hour)) diff --git a/e2e/kind.yaml b/e2e/kind.yaml index 911779e..8209778 100644 --- a/e2e/kind.yaml +++ b/e2e/kind.yaml @@ -1,5 +1,7 @@ kind: Cluster apiVersion: kind.x-k8s.io/v1alpha4 +featureGates: + InPlacePodVerticalScaling: true nodes: - role: control-plane extraMounts: diff --git a/e2e/setup_test.go b/e2e/setup_test.go index 0373aa4..7fbf493 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -121,6 +121,9 @@ func startKind(t testing.TB, name string, port int) (c *rest.Config, err error) if err := provider.Create(name, cluster.CreateWithV1Alpha4Config(&v1alpha4.Cluster{ Name: name, + FeatureGates: map[string]bool{ + "InPlacePodVerticalScaling": true, + }, Nodes: []v1alpha4.Node{{ Labels: map[string]string{zeropod.NodeLabel: "true"}, // setup port map for our node port @@ -349,6 +352,14 @@ func portsAnnotation(portsMap string) podOption { }) } +func resources(res corev1.ResourceRequirements) podOption { + return func(p *pod) { + for i := range p.Spec.Containers { + p.Spec.Containers[i].Resources = res + } + } +} + const agnHostImage = "registry.k8s.io/e2e-test-images/agnhost:2.39" func agnContainer(name string, port int) podOption { diff --git a/go.mod b/go.mod index 8cc6cf4..c939e4e 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( github.com/docker/go-metrics v0.0.1 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.2 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.7.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-errors/errors v1.4.2 // indirect diff --git a/go.sum b/go.sum index f678aad..348374a 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= +github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/evanphx/json-patch/v5 v5.7.0 h1:nJqP7uwL84RJInrohHfW0Fx3awjbm8qZeFv0nW9SYGc= github.com/evanphx/json-patch/v5 v5.7.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= diff --git a/manager/pod_scaler.go b/manager/pod_scaler.go new file mode 100644 index 0000000..ae4bf3e --- /dev/null +++ b/manager/pod_scaler.go @@ -0,0 +1,196 @@ +package manager + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +const ( + CPUAnnotationKey = "zeropod.ctrox.dev/cpu-requests" + MemoryAnnotationKey = "zeropod.ctrox.dev/memory-requests" +) + +var ( + ScaledDownCPU = resource.MustParse("1m") + ScaledDownMemory = resource.MustParse("1Ki") +) + +type containerResource map[string]resource.Quantity + +type PodScaler struct { + client client.Client +} + +func NewPodScaler() (*PodScaler, error) { + slog.Info("pod scaler init") + cfg, err := config.GetConfig() + if err != nil { + return nil, err + } + c, err := client.New(cfg, client.Options{}) + return &PodScaler{client: c}, err +} + +func (ps *PodScaler) Handle(ctx context.Context, status *v1.ContainerStatus) error { + clog := slog.With("container", status.Name, "pod", status.PodName, + "namespace", status.PodNamespace, "phase", status.Phase) + clog.Info("handling pod") + + pod := &corev1.Pod{} + podName := types.NamespacedName{Name: status.PodName, Namespace: status.PodNamespace} + if err := ps.client.Get(ctx, podName, pod); err != nil { + return err + } + + updatePod := false + for i, container := range pod.Spec.Containers { + if container.Name != status.Name { + continue + } + + _, hasCPU := container.Resources.Requests[corev1.ResourceCPU] + _, hasMemory := container.Resources.Requests[corev1.ResourceMemory] + if !hasCPU || !hasMemory { + clog.Debug("ignoring container without resources") + continue + } + + initial, err := ps.initialRequests(container, pod.Annotations) + if err != nil { + return fmt.Errorf("getting initial requests from pod failed: %w", err) + } + + current := container.Resources.Requests + if ps.isUpToDate(initial, current, status) { + clog.Debug("container is up to date", "initial", printResources(initial)) + continue + } + + if err := ps.setAnnotations(pod); err != nil { + return err + } + + new := ps.newRequests(initial, current, status) + pod.Spec.Containers[i].Resources.Requests = new + clog.Debug("container needs to be updated", "current", printResources(current), "new", printResources(new)) + updatePod = true + } + + if !updatePod { + return nil + } + + if err := ps.updateRequests(ctx, pod); err != nil { + if errors.IsInvalid(err) { + clog.Error("in-place scaling failed, ensure InPlacePodVerticalScaling feature flag is enabled") + return nil + } + return err + } + + return nil +} + +func (ps *PodScaler) isUpToDate(initial, current corev1.ResourceList, status *v1.ContainerStatus) bool { + switch status.Phase { + case v1.ContainerPhase_SCALED_DOWN: + return current[corev1.ResourceCPU] == ScaledDownCPU && + current[corev1.ResourceMemory] == ScaledDownMemory + case v1.ContainerPhase_RUNNING: + return current[corev1.ResourceCPU] == initial[corev1.ResourceCPU] && + current[corev1.ResourceMemory] == initial[corev1.ResourceMemory] + default: + return true + } +} + +func (ps *PodScaler) newRequests(initial, current corev1.ResourceList, status *v1.ContainerStatus) corev1.ResourceList { + switch status.Phase { + case v1.ContainerPhase_SCALED_DOWN: + current[corev1.ResourceCPU] = ScaledDownCPU + current[corev1.ResourceMemory] = ScaledDownMemory + return current + case v1.ContainerPhase_RUNNING: + return initial + default: + return current + } +} + +func (ps *PodScaler) initialRequests(container corev1.Container, podAnnotations map[string]string) (corev1.ResourceList, error) { + initial := container.DeepCopy().Resources.Requests + containerCPUs := containerResource{} + if cpuReq, ok := podAnnotations[CPUAnnotationKey]; ok { + if err := json.Unmarshal([]byte(cpuReq), &containerCPUs); err != nil { + return nil, err + } + } + + containerMemory := containerResource{} + if memortReq, ok := podAnnotations[MemoryAnnotationKey]; ok { + if err := json.Unmarshal([]byte(memortReq), &containerMemory); err != nil { + return nil, err + } + } + + if cpu, ok := containerCPUs[container.Name]; ok { + initial[corev1.ResourceCPU] = cpu + } + + if memory, ok := containerMemory[container.Name]; ok { + initial[corev1.ResourceMemory] = memory + } + + return initial, nil +} + +func (ps *PodScaler) setAnnotations(pod *corev1.Pod) error { + containerCPUs := containerResource{} + containerMemory := containerResource{} + for _, container := range pod.Spec.Containers { + containerCPUs[container.Name] = container.Resources.Requests[corev1.ResourceCPU] + containerMemory[container.Name] = container.Resources.Requests[corev1.ResourceMemory] + } + + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + + if _, ok := pod.Annotations[CPUAnnotationKey]; !ok { + val, err := json.Marshal(containerCPUs) + if err != nil { + return err + } + pod.Annotations[CPUAnnotationKey] = string(val) + } + + if _, ok := pod.Annotations[MemoryAnnotationKey]; !ok { + val, err := json.Marshal(containerMemory) + if err != nil { + return err + } + pod.Annotations[MemoryAnnotationKey] = string(val) + } + + return nil +} + +func (ps *PodScaler) updateRequests(ctx context.Context, pod *corev1.Pod) error { + return ps.client.Update(ctx, pod) +} + +func printResources(res corev1.ResourceList) string { + cpu := res[corev1.ResourceCPU] + memory := res[corev1.ResourceMemory] + return fmt.Sprintf("cpu: %s, memory: %s", cpu.String(), memory.String()) +} diff --git a/manager/pod_scaler_test.go b/manager/pod_scaler_test.go new file mode 100644 index 0000000..8b1f978 --- /dev/null +++ b/manager/pod_scaler_test.go @@ -0,0 +1,132 @@ +package manager + +import ( + "context" + "log/slog" + "testing" + + v1 "github.com/ctrox/zeropod/api/shim/v1" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestHandlePod(t *testing.T) { + slog.SetLogLoggerLevel(slog.LevelDebug) + + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + t.Fatal(err) + } + runningCPU, runningMemory := resource.MustParse("100m"), resource.MustParse("100Mi") + + cases := map[string]struct { + statusEventPhase v1.ContainerPhase + beforeEvent corev1.ResourceList + expected corev1.ResourceList + }{ + "running pod is not updated": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + }, + "running is updated when scaling down": { + statusEventPhase: v1.ContainerPhase_SCALED_DOWN, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + }, + "scaled down pod is not updated": { + statusEventPhase: v1.ContainerPhase_SCALED_DOWN, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + }, + "scaled down pod requests are restored when starting": { + statusEventPhase: v1.ContainerPhase_RUNNING, + beforeEvent: corev1.ResourceList{ + corev1.ResourceCPU: ScaledDownCPU, + corev1.ResourceMemory: ScaledDownMemory, + }, + expected: corev1.ResourceList{ + corev1.ResourceCPU: runningCPU, + corev1.ResourceMemory: runningMemory, + }, + }, + } + + for name, tc := range cases { + tc := tc + t.Run(name, func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(scheme).Build() + ps := &PodScaler{ + client: client, + } + + initialPod := newPod(corev1.ResourceList{corev1.ResourceCPU: runningCPU, corev1.ResourceMemory: runningMemory}) + ps.setAnnotations(initialPod) + pod := newPod(tc.beforeEvent) + pod.SetAnnotations(initialPod.GetAnnotations()) + + ctx := context.Background() + if err := client.Create(ctx, pod); err != nil { + t.Fatal(err) + } + + if err := ps.Handle( + context.Background(), + &v1.ContainerStatus{ + Name: pod.Spec.Containers[0].Name, + PodName: pod.Name, + PodNamespace: pod.Namespace, + Phase: tc.statusEventPhase, + }, + ); err != nil { + t.Fatal(err) + } + + if err := client.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod); err != nil { + t.Fatal(err) + } + + assert.Equal(t, pod.Spec.Containers[0].Resources.Requests, tc.expected) + }) + } +} + +func newPod(req corev1.ResourceList) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "scaled-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "first-container", + Resources: corev1.ResourceRequirements{ + Requests: req, + }, + }}, + }, + } +} diff --git a/manager/status.go b/manager/status.go index 88248af..0279b8c 100644 --- a/manager/status.go +++ b/manager/status.go @@ -17,7 +17,17 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -func StartSubscribers(ctx context.Context) error { +type StatusHandler interface { + Handle(context.Context, *v1.ContainerStatus) error +} + +func StartSubscribers(ctx context.Context, handlers ...StatusHandler) error { + if _, err := os.Stat(task.ShimSocketPath); errors.Is(err, os.ErrNotExist) { + if err := os.Mkdir(task.ShimSocketPath, os.ModePerm); err != nil { + return err + } + } + socks, err := os.ReadDir(task.ShimSocketPath) if err != nil { return fmt.Errorf("error listing file in shim socket path: %s", err) @@ -26,18 +36,18 @@ func StartSubscribers(ctx context.Context) error { for _, sock := range socks { sock := sock go func() { - if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name())); err != nil { + if err := subscribe(ctx, filepath.Join(task.ShimSocketPath, sock.Name()), handlers); err != nil { slog.Error("error subscribing", "sock", sock.Name(), "err", err) } }() } - go watchForShims(ctx) + go watchForShims(ctx, handlers) return nil } -func subscribe(ctx context.Context, sock string) error { +func subscribe(ctx context.Context, sock string, handlers []StatusHandler) error { log := slog.With("sock", sock) log.Info("subscribing to status events") @@ -64,15 +74,19 @@ func subscribe(ctx context.Context, sock string) error { } break } - slog.Info("received status", - "container", status.Name, "pod", status.PodName, + clog := slog.With("container", status.Name, "pod", status.PodName, "namespace", status.PodNamespace, "phase", status.Phase) + for _, h := range handlers { + if err := h.Handle(ctx, status); err != nil { + clog.Error("handling status update", "err", err) + } + } } return nil } -func watchForShims(ctx context.Context) error { +func watchForShims(ctx context.Context, handlers []StatusHandler) error { watcher, err := fsnotify.NewWatcher() if err != nil { return err @@ -88,7 +102,7 @@ func watchForShims(ctx context.Context) error { case event := <-watcher.Events: switch event.Op { case fsnotify.Create: - if err := subscribe(ctx, event.Name); err != nil { + if err := subscribe(ctx, event.Name, handlers); err != nil { slog.Error("error subscribing", "sock", event.Name, "err", err) } } From 2b896f97199fdc41a4e629835e35d05b92237a3c Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 2 Jun 2024 10:00:32 +0200 Subject: [PATCH 4/6] feat: generate ttrpc during build --- .github/workflows/ci.yml | 14 ++++++++++++-- Makefile | 5 +++-- api/shim/v1/shim.pb.go | 2 +- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 54b5156..1591857 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: run: sudo --preserve-env make test build: - runs-on: ubuntu-latest + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v3 @@ -46,10 +46,20 @@ jobs: with: go-version: "1.22" + - name: Install protoc-gen-go + run: | + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.31 + go install github.com/containerd/ttrpc/cmd/protoc-gen-go-ttrpc@v1.2.4 + + - uses: awalsh128/cache-apt-pkgs-action@v1 + with: + packages: protobuf-compiler libprotobuf-dev + version: 1.0 + - name: build ebpf image run: make build-ebpf - - name: generate ebpf + - name: generate ttrpc and ebpf run: make generate - name: check for diff diff --git a/Makefile b/Makefile index 4d88d3f..2fb96ee 100644 --- a/Makefile +++ b/Makefile @@ -80,14 +80,15 @@ CFLAGS := -O2 -g -Wall -Werror # dependencies installed. generate: export BPF_CLANG := $(CLANG) generate: export BPF_CFLAGS := $(CFLAGS) -generate: +generate: ttrpc docker run --rm -v $(PWD):/app:Z --user $(shell id -u):$(shell id -g) --env=BPF_CLANG="$(CLANG)" --env=BPF_CFLAGS="$(CFLAGS)" $(EBPF_IMAGE) ttrpc: + go mod download cd api/shim/v1; protoc --go_out=. --go_opt=paths=source_relative \ --ttrpc_out=. --plugin=protoc-gen-ttrpc=`which protoc-gen-go-ttrpc` \ --ttrpc_opt=paths=source_relative *.proto -I. \ - -I ${GOPATH}/pkg/mod/github.com/prometheus/client_model@v0.5.0 + -I $(shell go env GOMODCACHE)/github.com/prometheus/client_model@v0.5.0 # to improve reproducibility of the bpf builds, we dump the vmlinux.h and # store it compressed in git instead of dumping it during the build. diff --git a/api/shim/v1/shim.pb.go b/api/shim/v1/shim.pb.go index a60b8af..c94d8aa 100644 --- a/api/shim/v1/shim.pb.go +++ b/api/shim/v1/shim.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.31.0 -// protoc v3.19.6 +// protoc v3.21.12 // source: shim.proto package v1 From 70be5183b9e0903d2bb323d57fbc3116fa1ddb6e Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 2 Jun 2024 11:36:30 +0200 Subject: [PATCH 5/6] feat: add kustomize component for enabling in-place-scaling This adds a kustomize component to enable to in-place-scaling feature on demand. It is enabled by default for the kind overlay for local and e2e testing. --- README.md | 6 ++++- config/base/rbac.yaml | 27 ---------------------- config/in-place-scaling/kustomization.yaml | 11 +++++++++ config/in-place-scaling/rbac.yaml | 26 +++++++++++++++++++++ config/kind/kustomization.yaml | 9 ++------ config/production/kustomization.yaml | 3 +++ 6 files changed, 47 insertions(+), 35 deletions(-) create mode 100644 config/in-place-scaling/kustomization.yaml create mode 100644 config/in-place-scaling/rbac.yaml diff --git a/README.md b/README.md index 2d2cc3a..314d822 100644 --- a/README.md +++ b/README.md @@ -298,7 +298,11 @@ This makes use of the feature flag to automatically update the pod resource requests to a minimum on scale down events and revert them again on scale up. Once the Kubernetes feature flag is enabled, it also needs to be enabled using the manager flag -`-in-place-scaling=true`. +`-in-place-scaling=true` plus some additional permissions are required for the +node driver to patch pods. To deploy this, simply uncomment the +`in-place-scaling` component in the `config/production/kustomization.yaml`. +This will add the flag and the required permissions when building the +kustomization. #### Flags diff --git a/config/base/rbac.yaml b/config/base/rbac.yaml index 59a3958..aab8fac 100644 --- a/config/base/rbac.yaml +++ b/config/base/rbac.yaml @@ -31,30 +31,3 @@ subjects: - kind: ServiceAccount name: zeropod-node namespace: zeropod-system ---- -# the manager needs to get/update pods for dynamic resource requests -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: zeropod:pod-updater -rules: - - apiGroups: - - "" - resources: - - pods - verbs: - - get - - update ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: zeropod:pod-updater -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: zeropod:pod-updater -subjects: - - kind: ServiceAccount - name: zeropod-node - namespace: zeropod-system diff --git a/config/in-place-scaling/kustomization.yaml b/config/in-place-scaling/kustomization.yaml new file mode 100644 index 0000000..70abbf1 --- /dev/null +++ b/config/in-place-scaling/kustomization.yaml @@ -0,0 +1,11 @@ +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - rbac.yaml +patches: + - patch: |- + - op: add + path: /spec/template/spec/containers/0/args/- + value: -in-place-scaling=true + target: + kind: DaemonSet diff --git a/config/in-place-scaling/rbac.yaml b/config/in-place-scaling/rbac.yaml new file mode 100644 index 0000000..0bc9b66 --- /dev/null +++ b/config/in-place-scaling/rbac.yaml @@ -0,0 +1,26 @@ +# the manager needs to get/update pods for dynamic resource requests +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: zeropod:pod-updater +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: zeropod:pod-updater +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: zeropod:pod-updater +subjects: + - kind: ServiceAccount + name: zeropod-node + namespace: zeropod-system diff --git a/config/kind/kustomization.yaml b/config/kind/kustomization.yaml index ff89457..eb58447 100644 --- a/config/kind/kustomization.yaml +++ b/config/kind/kustomization.yaml @@ -1,5 +1,7 @@ resources: - ../base +components: + - ../in-place-scaling images: - name: manager newName: ghcr.io/ctrox/zeropod-manager @@ -7,10 +9,3 @@ images: - name: installer newName: ghcr.io/ctrox/zeropod-installer newTag: dev -patches: - - patch: |- - - op: add - path: /spec/template/spec/containers/0/args/- - value: -in-place-scaling=true - target: - kind: DaemonSet diff --git a/config/production/kustomization.yaml b/config/production/kustomization.yaml index f3848d9..eadd65d 100644 --- a/config/production/kustomization.yaml +++ b/config/production/kustomization.yaml @@ -14,3 +14,6 @@ patches: value: -criu-image=ghcr.io/ctrox/zeropod-criu:v3.19 target: kind: DaemonSet +# uncommment to enable in-place scaling +# components: +# - ../in-place-scaling From 09265b4e590d4104e551a13ccbfca1399522e491 Mon Sep 17 00:00:00 2001 From: Cyrill Troxler Date: Sun, 2 Jun 2024 12:32:15 +0200 Subject: [PATCH 6/6] test: fail metrics tests instead of erroring --- e2e/setup_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/e2e/setup_test.go b/e2e/setup_test.go index 7fbf493..4e2c1e4 100644 --- a/e2e/setup_test.go +++ b/e2e/setup_test.go @@ -547,7 +547,7 @@ func restoreCount(t testing.TB, client client.Client, cfg *rest.Config, pod *cor restoreDuration := prometheus.BuildFQName(zeropod.MetricsNamespace, "", zeropod.MetricRestoreDuration) val, ok := mfs[restoreDuration] if !ok { - t.Errorf("could not find expected metric: %s", restoreDuration) + t.Fatalf("could not find expected metric: %s", restoreDuration) } metric, ok := findMetricByLabelMatch(val.Metric, map[string]string{ @@ -555,15 +555,15 @@ func restoreCount(t testing.TB, client client.Client, cfg *rest.Config, pod *cor zeropod.LabelPodNamespace: pod.Namespace, }) if !ok { - t.Errorf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) + t.Fatalf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) } if metric.Histogram == nil { - t.Errorf("found metric that is not a histogram") + t.Fatalf("found metric that is not a histogram") } if metric.Histogram.SampleCount == nil { - t.Errorf("histogram sample count is nil") + t.Fatalf("histogram sample count is nil") } return int(*metric.Histogram.SampleCount) @@ -575,7 +575,7 @@ func checkpointCount(t testing.TB, client client.Client, cfg *rest.Config, pod * checkpointDuration := prometheus.BuildFQName(zeropod.MetricsNamespace, "", zeropod.MetricCheckPointDuration) val, ok := mfs[checkpointDuration] if !ok { - t.Errorf("could not find expected metric: %s", checkpointDuration) + t.Fatalf("could not find expected metric: %s", checkpointDuration) } metric, ok := findMetricByLabelMatch(val.Metric, map[string]string{ @@ -583,15 +583,15 @@ func checkpointCount(t testing.TB, client client.Client, cfg *rest.Config, pod * zeropod.LabelPodNamespace: pod.Namespace, }) if !ok { - t.Errorf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) + t.Fatalf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) } if metric.Histogram == nil { - t.Errorf("found metric that is not a histogram") + t.Fatalf("found metric that is not a histogram") } if metric.Histogram.SampleCount == nil { - t.Errorf("histogram sample count is nil") + t.Fatalf("histogram sample count is nil") } return int(*metric.Histogram.SampleCount) @@ -603,7 +603,7 @@ func isCheckpointed(t testing.TB, client client.Client, cfg *rest.Config, pod *c running := prometheus.BuildFQName(zeropod.MetricsNamespace, "", zeropod.MetricRunning) val, ok := mfs[running] if !ok { - t.Errorf("could not find expected metric: %s", running) + t.Fatalf("could not find expected metric: %s", running) } metric, ok := findMetricByLabelMatch(val.Metric, map[string]string{ @@ -611,15 +611,15 @@ func isCheckpointed(t testing.TB, client client.Client, cfg *rest.Config, pod *c zeropod.LabelPodNamespace: pod.Namespace, }) if !ok { - t.Errorf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) + t.Fatalf("could not find running metric that matches pod: %s/%s", pod.Name, pod.Namespace) } if metric.Gauge == nil { - t.Errorf("found metric that is not a gauge") + t.Fatalf("found metric that is not a gauge") } if metric.Gauge.Value == nil { - t.Errorf("gauge value is nil") + t.Fatalf("gauge value is nil") } return *metric.Gauge.Value == 0 && checkpointCount(t, client, cfg, pod) >= 1