Skip to content

Commit

Permalink
feat(k8sprocessor)!: use Endpointslices for Service metadata
Browse files Browse the repository at this point in the history
This improves memory utilization significantly for large clusters.
  • Loading branch information
Mikołaj Świątek authored and swiatekm committed Jan 11, 2024
1 parent 5fddc7e commit 76333dd
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 76 deletions.
3 changes: 3 additions & 0 deletions .changelog/1422.breaking.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
feat(k8sprocessor)!: use Endpointslices for Service metadata

If Service metadata is enabled in k8sprocessor configuration, it needs get/list/watch RBAC permission for EndpointSlices.
135 changes: 94 additions & 41 deletions pkg/processor/k8sprocessor/kube/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
discovery_v1 "k8s.io/api/discovery/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor/observability"
)

const endpointSliceServiceLabel = "kubernetes.io/service-name"

// OwnerProvider allows to dynamically assign constructor
type OwnerProvider func(
logger *zap.Logger,
Expand Down Expand Up @@ -134,7 +137,9 @@ func newOwnerProvider(
ownerCache.addOwnerInformer("DaemonSet",
factory.Apps().V1().DaemonSets().Informer(),
ownerCache.cacheObject,
ownerCache.deleteObject)
ownerCache.deleteObject,
nil,
)
}

// Only enable ReplicaSet informer when ReplicaSet or DeploymentName extraction rule is enabled
Expand All @@ -143,7 +148,9 @@ func newOwnerProvider(
ownerCache.addOwnerInformer("ReplicaSet",
factory.Apps().V1().ReplicaSets().Informer(),
ownerCache.cacheObject,
ownerCache.deleteObject)
ownerCache.deleteObject,
nil,
)
}

// Only enable Deployment informer when Deployment extraction rule is enabled
Expand All @@ -152,7 +159,9 @@ func newOwnerProvider(
ownerCache.addOwnerInformer("Deployment",
factory.Apps().V1().Deployments().Informer(),
ownerCache.cacheObject,
ownerCache.deleteObject)
ownerCache.deleteObject,
nil,
)
}

// Only enable StatefulSet informer when StatefulSet extraction rule is enabled
Expand All @@ -161,16 +170,27 @@ func newOwnerProvider(
ownerCache.addOwnerInformer("StatefulSet",
factory.Apps().V1().StatefulSets().Informer(),
ownerCache.cacheObject,
ownerCache.deleteObject)
ownerCache.deleteObject,
nil,
)
}

// Only enable Endpoint informer when Endpoint extraction rule is enabled
if extractionRules.ServiceName {
logger.Debug("adding informer for Endpoint", zap.String("api_version", "v1"))
ownerCache.addOwnerInformer("Endpoint",
factory.Core().V1().Endpoints().Informer(),
ownerCache.cacheEndpoint,
ownerCache.deleteEndpoint)
logger.Debug("adding informer for EndpointSlice", zap.String("api_version", "discovery.k8s.io/v1"))
ownerCache.addOwnerInformer("EndpointSlice",
factory.Discovery().V1().EndpointSlices().Informer(),
ownerCache.cacheEndpointSlice,
ownerCache.deleteEndpointSlice,
func(object interface{}) (interface{}, error) {
originalES, success := object.(*discovery_v1.EndpointSlice)
if !success {
return object.(cache.DeletedFinalStateUnknown), nil
} else {
return removeUnnecessaryEndpointSliceData(originalES), nil
}
},
)
}

// Only enable Job informer when Job or CronJob extraction rule is enabled
Expand All @@ -179,7 +199,9 @@ func newOwnerProvider(
ownerCache.addOwnerInformer("Job",
factory.Batch().V1().Jobs().Informer(),
ownerCache.cacheObject,
ownerCache.deleteObject)
ownerCache.deleteObject,
nil,
)
}

// Only enable CronJob informer when CronJob extraction rule is enabled
Expand All @@ -199,7 +221,9 @@ func newOwnerProvider(
ownerCache.addOwnerInformer("CronJob",
informer,
ownerCache.cacheObject,
ownerCache.deleteObject)
ownerCache.deleteObject,
nil,
)
}

handleAPIResources := func(informer cache.SharedIndexInformer, apiResources []meta_v1.APIResource) bool {
Expand Down Expand Up @@ -322,6 +346,7 @@ func (op *OwnerCache) addOwnerInformer(
informer cache.SharedIndexInformer,
cacheFunc func(kind string, obj interface{}),
deleteFunc func(obj interface{}),
transformFunc cache.TransformFunc,
) {
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -338,7 +363,13 @@ func (op *OwnerCache) addOwnerInformer(
}),
})
if err != nil {
op.logger.Error("error adding event handler to namespace informer", zap.Error(err))
op.logger.Error("error adding event handler to owner informer", zap.Error(err), zap.String("kind", kind))
}

if transformFunc != nil {
if err = informer.SetTransform(transformFunc); err != nil {
op.logger.Error("error adding transform to owner informer", zap.Error(err), zap.String("kind", kind))
}
}

op.informers = append(op.informers, informer)
Expand Down Expand Up @@ -392,30 +423,30 @@ func (op *OwnerCache) cacheObject(kind string, obj interface{}) {
op.ownersMutex.Unlock()
}

func (op *OwnerCache) addEndpointToPod(pod string, endpoint string) {
func (op *OwnerCache) addServiceToPod(pod string, serviceName string) {
op.podServicesMutex.Lock()
defer op.podServicesMutex.Unlock()

services, ok := op.podServices[pod]
if !ok {
// If there's no services/endpoints for a given pod then just update the cache
// with the provided enpoint.
op.podServices[pod] = []string{endpoint}
op.podServices[pod] = []string{serviceName}
return
}

for _, it := range services {
if it == endpoint {
if it == serviceName {
return
}
}

services = append(services, endpoint)
services = append(services, serviceName)
sort.Strings(services)
op.podServices[pod] = services
}

func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) {
func (op *OwnerCache) deleteServiceFromPod(pod string, serviceName string) {
op.podServicesMutex.Lock()
defer op.podServicesMutex.Unlock()

Expand All @@ -426,7 +457,7 @@ func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) {

for i := 0; len(services) > 0; {
service := services[i]
if service == endpoint {
if service == serviceName {
// Remove the ith entry by...
l := len(services)
last := services[l-1]
Expand All @@ -451,50 +482,45 @@ func (op *OwnerCache) deleteEndpointFromPod(pod string, endpoint string) {
}
}

func (op *OwnerCache) genericEndpointOp(obj interface{}, endpointFunc func(pod string, endpoint string)) {
var ep *api_v1.Endpoints

func (op *OwnerCache) genericEndpointSliceOp(obj interface{}, serviceFunc func(pod string, serviceName string)) {
var endpointSlice *discovery_v1.EndpointSlice
switch obj := obj.(type) {
case *api_v1.Endpoints:
ep = obj
case *discovery_v1.EndpointSlice:
endpointSlice = obj
case cache.DeletedFinalStateUnknown:
prev, ok := obj.Obj.(*api_v1.Endpoints)
prev, ok := obj.Obj.(*discovery_v1.EndpointSlice)
if !ok {
op.logger.Error(
"object received was DeletedFinalStateUnknown but did not contain api_v1.Endpoints",
"object received was DeletedFinalStateUnknown but did not contain EndpointSlice",
zap.Any("received", obj),
)
return
}
ep = prev
endpointSlice = prev
default:
op.logger.Error(
"object received was not of type api_v1.Endpoints",
"object received was not of type EndpointSlice",
zap.Any("received", obj),
)
return
}

for _, it := range ep.Subsets {
for _, addr := range it.Addresses {
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
endpointFunc(addr.TargetRef.Name, ep.Name)
}
}
for _, addr := range it.NotReadyAddresses {
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
endpointFunc(addr.TargetRef.Name, ep.Name)
}
epLabels := endpointSlice.GetLabels()
serviceName := epLabels[endpointSliceServiceLabel] // see: https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership

for _, endpoint := range endpointSlice.Endpoints {
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" {
serviceFunc(endpoint.TargetRef.Name, serviceName)
}
}
}

func (op *OwnerCache) deleteEndpoint(obj interface{}) {
op.genericEndpointOp(obj, op.deleteEndpointFromPod)
func (op *OwnerCache) deleteEndpointSlice(obj interface{}) {
op.genericEndpointSliceOp(obj, op.deleteServiceFromPod)
}

func (op *OwnerCache) cacheEndpoint(kind string, obj interface{}) {
op.genericEndpointOp(obj, op.addEndpointToPod)
func (op *OwnerCache) cacheEndpointSlice(kind string, obj interface{}) {
op.genericEndpointSliceOp(obj, op.addServiceToPod)
}

// GetNamespaces returns a cached namespace object (if one is found) or nil otherwise
Expand Down Expand Up @@ -599,3 +625,30 @@ type ownerCacheEviction struct {
ts time.Time
evict func()
}

// This function removes all data from the EndpointSlice except what is required by the client
func removeUnnecessaryEndpointSliceData(endpointSlice *discovery_v1.EndpointSlice) *discovery_v1.EndpointSlice {
// name and namespace are needed by the informer store
transformedEndpointSlice := discovery_v1.EndpointSlice{
ObjectMeta: meta_v1.ObjectMeta{
Name: endpointSlice.GetName(),
Namespace: endpointSlice.GetNamespace(),
},
}

// we need a particular label to get the Service name
serviceName := endpointSlice.GetLabels()[endpointSliceServiceLabel]
transformedEndpointSlice.SetLabels(map[string]string{
endpointSliceServiceLabel: serviceName,
})

// and for each endpoint, we need the targetRef
transformedEndpointSlice.Endpoints = make([]discovery_v1.Endpoint, len(endpointSlice.Endpoints))
for i, endpoint := range endpointSlice.Endpoints {
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" {
transformedEndpointSlice.Endpoints[i].TargetRef = endpoint.TargetRef
}
}

return &transformedEndpointSlice
}
Loading

0 comments on commit 76333dd

Please sign in to comment.