From 76333ddf5762687e81c1f1fbca2e30d3bc203b6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Sat, 6 May 2023 23:10:15 +0200 Subject: [PATCH] feat(k8sprocessor)!: use Endpointslices for Service metadata This improves memory utilization significantly for large clusters. --- .changelog/1422.breaking.txt | 3 + pkg/processor/k8sprocessor/kube/owner.go | 135 ++++++++++++------ pkg/processor/k8sprocessor/kube/owner_test.go | 99 ++++++++----- 3 files changed, 161 insertions(+), 76 deletions(-) create mode 100644 .changelog/1422.breaking.txt diff --git a/.changelog/1422.breaking.txt b/.changelog/1422.breaking.txt new file mode 100644 index 0000000000..894bf527cc --- /dev/null +++ b/.changelog/1422.breaking.txt @@ -0,0 +1,3 @@ +feat(k8sprocessor)!: use Endpointslices for Service metadata + +If Service metadata is enabled in k8sprocessor configuration, it needs get/list/watch RBAC permission for EndpointSlices. \ No newline at end of file diff --git a/pkg/processor/k8sprocessor/kube/owner.go b/pkg/processor/k8sprocessor/kube/owner.go index b2c6550e15..b2742fa1a8 100644 --- a/pkg/processor/k8sprocessor/kube/owner.go +++ b/pkg/processor/k8sprocessor/kube/owner.go @@ -21,6 +21,7 @@ import ( "go.uber.org/zap" api_v1 "k8s.io/api/core/v1" + discovery_v1 "k8s.io/api/discovery/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -32,6 +33,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor/observability" ) +const endpointSliceServiceLabel = "kubernetes.io/service-name" + // OwnerProvider allows to dynamically assign constructor type OwnerProvider func( logger *zap.Logger, @@ -134,7 +137,9 @@ func newOwnerProvider( ownerCache.addOwnerInformer("DaemonSet", factory.Apps().V1().DaemonSets().Informer(), ownerCache.cacheObject, - ownerCache.deleteObject) + ownerCache.deleteObject, + nil, + ) } // Only enable ReplicaSet informer when ReplicaSet or DeploymentName extraction rule is enabled @@ -143,7 +148,9 @@ func newOwnerProvider( ownerCache.addOwnerInformer("ReplicaSet", factory.Apps().V1().ReplicaSets().Informer(), ownerCache.cacheObject, - ownerCache.deleteObject) + ownerCache.deleteObject, + nil, + ) } // Only enable Deployment informer when Deployment extraction rule is enabled @@ -152,7 +159,9 @@ func newOwnerProvider( ownerCache.addOwnerInformer("Deployment", factory.Apps().V1().Deployments().Informer(), ownerCache.cacheObject, - ownerCache.deleteObject) + ownerCache.deleteObject, + nil, + ) } // Only enable StatefulSet informer when StatefulSet extraction rule is enabled @@ -161,16 +170,27 @@ func newOwnerProvider( ownerCache.addOwnerInformer("StatefulSet", factory.Apps().V1().StatefulSets().Informer(), ownerCache.cacheObject, - ownerCache.deleteObject) + ownerCache.deleteObject, + nil, + ) } // Only enable Endpoint informer when Endpoint extraction rule is enabled if extractionRules.ServiceName { - logger.Debug("adding informer for Endpoint", zap.String("api_version", "v1")) - ownerCache.addOwnerInformer("Endpoint", - factory.Core().V1().Endpoints().Informer(), - ownerCache.cacheEndpoint, - ownerCache.deleteEndpoint) + logger.Debug("adding informer for EndpointSlice", zap.String("api_version", "discovery.k8s.io/v1")) + ownerCache.addOwnerInformer("EndpointSlice", + factory.Discovery().V1().EndpointSlices().Informer(), + ownerCache.cacheEndpointSlice, + ownerCache.deleteEndpointSlice, + func(object interface{}) (interface{}, error) { + originalES, success := object.(*discovery_v1.EndpointSlice) + if !success { + return object.(cache.DeletedFinalStateUnknown), nil + } else { + return removeUnnecessaryEndpointSliceData(originalES), nil + } + }, + ) } // Only enable Job informer when Job or CronJob extraction rule is enabled @@ -179,7 +199,9 @@ func newOwnerProvider( ownerCache.addOwnerInformer("Job", factory.Batch().V1().Jobs().Informer(), ownerCache.cacheObject, - ownerCache.deleteObject) + ownerCache.deleteObject, + nil, + ) } // Only enable CronJob informer when CronJob extraction rule is enabled @@ -199,7 +221,9 @@ func newOwnerProvider( ownerCache.addOwnerInformer("CronJob", informer, ownerCache.cacheObject, - ownerCache.deleteObject) + ownerCache.deleteObject, + nil, + ) } handleAPIResources := func(informer cache.SharedIndexInformer, apiResources []meta_v1.APIResource) bool { @@ -322,6 +346,7 @@ func (op *OwnerCache) addOwnerInformer( informer cache.SharedIndexInformer, cacheFunc func(kind string, obj interface{}), deleteFunc func(obj interface{}), + transformFunc cache.TransformFunc, ) { _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -338,7 +363,13 @@ func (op *OwnerCache) addOwnerInformer( }), }) if err != nil { - op.logger.Error("error adding event handler to namespace informer", zap.Error(err)) + op.logger.Error("error adding event handler to owner informer", zap.Error(err), zap.String("kind", kind)) + } + + if transformFunc != nil { + if err = informer.SetTransform(transformFunc); err != nil { + op.logger.Error("error adding transform to owner informer", zap.Error(err), zap.String("kind", kind)) + } } op.informers = append(op.informers, informer) @@ -392,7 +423,7 @@ func (op *OwnerCache) cacheObject(kind string, obj interface{}) { op.ownersMutex.Unlock() } -func (op *OwnerCache) addEndpointToPod(pod string, endpoint string) { +func (op *OwnerCache) addServiceToPod(pod string, serviceName string) { op.podServicesMutex.Lock() defer op.podServicesMutex.Unlock() @@ -400,22 +431,22 @@ func (op *OwnerCache) addEndpointToPod(pod string, endpoint string) { if !ok { // If there's no services/endpoints for a given pod then just update the cache // with the provided enpoint. - op.podServices[pod] = []string{endpoint} + op.podServices[pod] = []string{serviceName} return } for _, it := range services { - if it == endpoint { + if it == serviceName { return } } - services = append(services, endpoint) + services = append(services, serviceName) sort.Strings(services) op.podServices[pod] = services } -func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) { +func (op *OwnerCache) deleteServiceFromPod(pod string, serviceName string) { op.podServicesMutex.Lock() defer op.podServicesMutex.Unlock() @@ -426,7 +457,7 @@ func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) { for i := 0; len(services) > 0; { service := services[i] - if service == endpoint { + if service == serviceName { // Remove the ith entry by... l := len(services) last := services[l-1] @@ -451,50 +482,45 @@ func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) { } } -func (op *OwnerCache) genericEndpointOp(obj interface{}, endpointFunc func(pod string, endpoint string)) { - var ep *api_v1.Endpoints - +func (op *OwnerCache) genericEndpointSliceOp(obj interface{}, serviceFunc func(pod string, serviceName string)) { + var endpointSlice *discovery_v1.EndpointSlice switch obj := obj.(type) { - case *api_v1.Endpoints: - ep = obj + case *discovery_v1.EndpointSlice: + endpointSlice = obj case cache.DeletedFinalStateUnknown: - prev, ok := obj.Obj.(*api_v1.Endpoints) + prev, ok := obj.Obj.(*discovery_v1.EndpointSlice) if !ok { op.logger.Error( - "object received was DeletedFinalStateUnknown but did not contain api_v1.Endpoints", + "object received was DeletedFinalStateUnknown but did not contain EndpointSlice", zap.Any("received", obj), ) return } - ep = prev + endpointSlice = prev default: op.logger.Error( - "object received was not of type api_v1.Endpoints", + "object received was not of type EndpointSlice", zap.Any("received", obj), ) return } - for _, it := range ep.Subsets { - for _, addr := range it.Addresses { - if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" { - endpointFunc(addr.TargetRef.Name, ep.Name) - } - } - for _, addr := range it.NotReadyAddresses { - if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" { - endpointFunc(addr.TargetRef.Name, ep.Name) - } + epLabels := endpointSlice.GetLabels() + serviceName := epLabels[endpointSliceServiceLabel] // see: https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership + + for _, endpoint := range endpointSlice.Endpoints { + if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" { + serviceFunc(endpoint.TargetRef.Name, serviceName) } } } -func (op *OwnerCache) deleteEndpoint(obj interface{}) { - op.genericEndpointOp(obj, op.deleteEndpointFromPod) +func (op *OwnerCache) deleteEndpointSlice(obj interface{}) { + op.genericEndpointSliceOp(obj, op.deleteServiceFromPod) } -func (op *OwnerCache) cacheEndpoint(kind string, obj interface{}) { - op.genericEndpointOp(obj, op.addEndpointToPod) +func (op *OwnerCache) cacheEndpointSlice(kind string, obj interface{}) { + op.genericEndpointSliceOp(obj, op.addServiceToPod) } // GetNamespaces returns a cached namespace object (if one is found) or nil otherwise @@ -599,3 +625,30 @@ type ownerCacheEviction struct { ts time.Time evict func() } + +// This function removes all data from the EndpointSlice except what is required by the client +func removeUnnecessaryEndpointSliceData(endpointSlice *discovery_v1.EndpointSlice) *discovery_v1.EndpointSlice { + // name and namespace are needed by the informer store + transformedEndpointSlice := discovery_v1.EndpointSlice{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: endpointSlice.GetName(), + Namespace: endpointSlice.GetNamespace(), + }, + } + + // we need a particular label to get the Service name + serviceName := endpointSlice.GetLabels()[endpointSliceServiceLabel] + transformedEndpointSlice.SetLabels(map[string]string{ + endpointSliceServiceLabel: serviceName, + }) + + // and for each endpoint, we need the targetRef + transformedEndpointSlice.Endpoints = make([]discovery_v1.Endpoint, len(endpointSlice.Endpoints)) + for i, endpoint := range endpointSlice.Endpoints { + if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" { + transformedEndpointSlice.Endpoints[i].TargetRef = endpoint.TargetRef + } + } + + return &transformedEndpointSlice +} diff --git a/pkg/processor/k8sprocessor/kube/owner_test.go b/pkg/processor/k8sprocessor/kube/owner_test.go index b5099fa66b..ffd7a032b7 100644 --- a/pkg/processor/k8sprocessor/kube/owner_test.go +++ b/pkg/processor/k8sprocessor/kube/owner_test.go @@ -13,6 +13,7 @@ import ( v1 "k8s.io/api/apps/v1" batch_v1 "k8s.io/api/batch/v1" api_v1 "k8s.io/api/core/v1" + discovery_v1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -464,7 +465,7 @@ func Test_OwnerProvider_GetServices(t *testing.T) { require.NoError(t, err) client := c.(*fake.Clientset) - ch := waitForWatchToBeEstablished(client, "endpoints") + ch := waitForWatchToBeEstablished(client, "endpointslices") op.Start() t.Cleanup(func() { @@ -479,65 +480,77 @@ func Test_OwnerProvider_GetServices(t *testing.T) { UID: "f15f0585-a0bc-43a3-96e4-dd2eace75392", }, } - endpoints1 = &api_v1.Endpoints{ + endpointSlice1 = &discovery_v1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "my-service", + Name: "my-service-abc", Namespace: namespace, UID: "88125104-a4f6-40ac-906b-fcd385c127f3", + Labels: map[string]string{ + "kubernetes.io/service-name": "my-service", + }, }, TypeMeta: metav1.TypeMeta{ - Kind: "Endpoint", + Kind: "EndpointSlice", }, - Subsets: []api_v1.EndpointSubset{ + Endpoints: []discovery_v1.Endpoint{ { - Addresses: []api_v1.EndpointAddress{ - { - TargetRef: &api_v1.ObjectReference{ - Name: pod.Name, - Namespace: namespace, - Kind: "Pod", - UID: pod.UID, - }, - }, + TargetRef: &api_v1.ObjectReference{ + Name: pod.Name, + Namespace: namespace, + Kind: "Pod", + UID: pod.UID, }, }, }, } - endpoints2 = &api_v1.Endpoints{ + endpointSlice2 = &discovery_v1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "my-service-2", + Name: "my-service-2-def", Namespace: namespace, UID: "07ffe4a1-ca89-4d28-acb5-808b0c0bb20f", + Labels: map[string]string{ + "kubernetes.io/service-name": "my-service-2", + }, }, TypeMeta: metav1.TypeMeta{ - Kind: "Endpoint", + Kind: "EndpointSlice", }, - Subsets: []api_v1.EndpointSubset{ + Endpoints: []discovery_v1.Endpoint{ { - Addresses: []api_v1.EndpointAddress{ - { - TargetRef: &api_v1.ObjectReference{ - Name: pod.Name, - Namespace: namespace, - Kind: "Pod", - UID: pod.UID, - }, - }, + TargetRef: &api_v1.ObjectReference{ + Name: pod.Name, + Namespace: namespace, + Kind: "Pod", + UID: pod.UID, }, }, }, } + endpointSlice2Updated = &discovery_v1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-service-2-def", + Namespace: namespace, + UID: "07ffe4a1-ca89-4d28-acb5-808b0c0bb20f", + Labels: map[string]string{ + "kubernetes.io/service-name": "my-service-2", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + }, + Endpoints: []discovery_v1.Endpoint{}, + } ) <-ch t.Run("adding endpoints", func(t *testing.T) { - _, err = c.CoreV1().Endpoints(namespace). - Create(context.Background(), endpoints1, metav1.CreateOptions{}) + _, err = c.DiscoveryV1().EndpointSlices(namespace). + Create(context.Background(), endpointSlice1, metav1.CreateOptions{}) require.NoError(t, err) - _, err = c.CoreV1().Endpoints(namespace). - Create(context.Background(), endpoints2, metav1.CreateOptions{}) + _, err = c.DiscoveryV1().EndpointSlices(namespace). + Create(context.Background(), endpointSlice2, metav1.CreateOptions{}) require.NoError(t, err) _, err = c.CoreV1().Pods(namespace). @@ -555,9 +568,25 @@ func Test_OwnerProvider_GetServices(t *testing.T) { }, 5*time.Second, 10*time.Millisecond) }) + t.Run("updating endpoints", func(t *testing.T) { + t.Skip("Known bug, see https://github.com/SumoLogic/sumologic-otel-collector/issues/1414") + _, err = c.DiscoveryV1().EndpointSlices(namespace). + Update(context.Background(), endpointSlice2Updated, metav1.UpdateOptions{}) + require.NoError(t, err) + assert.Eventually(t, func() bool { + services := op.GetServices(pod.Name) + if len(services) != 1 { + t.Logf("services: %v", services) + return false + } + + return len(services) == 1 + }, 5*time.Second, 10*time.Millisecond) + }) + t.Run("deleting endpoints", func(t *testing.T) { - err = c.CoreV1().Endpoints(namespace). - Delete(context.Background(), endpoints1.Name, metav1.DeleteOptions{}) + err = c.DiscoveryV1().EndpointSlices(namespace). + Delete(context.Background(), endpointSlice1.Name, metav1.DeleteOptions{}) require.NoError(t, err) assert.Eventually(t, func() bool { services := op.GetServices(pod.Name) @@ -570,8 +599,8 @@ func Test_OwnerProvider_GetServices(t *testing.T) { }, 5*time.Second, 10*time.Millisecond) deleteSentAt := time.Now() - err = c.CoreV1().Endpoints(namespace). - Delete(context.Background(), endpoints2.Name, metav1.DeleteOptions{}) + err = c.DiscoveryV1().EndpointSlices(namespace). + Delete(context.Background(), endpointSlice2.Name, metav1.DeleteOptions{}) require.NoError(t, err) var ttd time.Duration