Skip to content

Commit

Permalink
fix(k8sprocessor): Pod Service cache invalidation
Browse files Browse the repository at this point in the history
  • Loading branch information
swiatekm committed Jan 12, 2024
1 parent 4201527 commit 8c7502f
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 35 deletions.
1 change: 1 addition & 0 deletions .changelog/1425.fixed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix(k8sprocessor): Pod Service cache invalidation
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
go.opentelemetry.io/collector v0.91.0
go.opentelemetry.io/collector/semconv v0.91.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc
k8s.io/api v0.28.4
k8s.io/apimachinery v0.28.4
k8s.io/client-go v0.28.4
Expand Down
5 changes: 3 additions & 2 deletions pkg/processor/k8sprocessor/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc h1:ao2WRsKSzW6KuUY9IWPwWahcHCgR0s52IfwutMfEbdM=
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -686,7 +687,7 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
96 changes: 67 additions & 29 deletions pkg/processor/k8sprocessor/kube/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"go.uber.org/zap"
"golang.org/x/exp/slices"
api_v1 "k8s.io/api/core/v1"
discovery_v1 "k8s.io/api/discovery/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -139,6 +140,7 @@ func newOwnerProvider(
ownerCache.cacheObject,
ownerCache.deleteObject,
nil,
nil,
)
}

Expand All @@ -150,6 +152,7 @@ func newOwnerProvider(
ownerCache.cacheObject,
ownerCache.deleteObject,
nil,
nil,
)
}

Expand All @@ -161,6 +164,7 @@ func newOwnerProvider(
ownerCache.cacheObject,
ownerCache.deleteObject,
nil,
nil,
)
}

Expand All @@ -172,6 +176,7 @@ func newOwnerProvider(
ownerCache.cacheObject,
ownerCache.deleteObject,
nil,
nil,
)
}

Expand All @@ -182,6 +187,7 @@ func newOwnerProvider(
factory.Discovery().V1().EndpointSlices().Informer(),
ownerCache.cacheEndpointSlice,
ownerCache.deleteEndpointSlice,
ownerCache.updateEndpointSlice,
func(object interface{}) (interface{}, error) {
originalES, success := object.(*discovery_v1.EndpointSlice)
if !success {
Expand All @@ -201,6 +207,7 @@ func newOwnerProvider(
ownerCache.cacheObject,
ownerCache.deleteObject,
nil,
nil,
)
}

Expand All @@ -223,6 +230,7 @@ func newOwnerProvider(
ownerCache.cacheObject,
ownerCache.deleteObject,
nil,
nil,
)
}

Expand Down Expand Up @@ -344,17 +352,25 @@ func (op *OwnerCache) deferredDelete(evict func(obj any)) func(any) {
func (op *OwnerCache) addOwnerInformer(
kind string,
informer cache.SharedIndexInformer,
cacheFunc func(kind string, obj interface{}),
addFunc func(kind string, obj interface{}),
deleteFunc func(obj interface{}),
updateFunc func(oldObj, newObj interface{}),
transformFunc cache.TransformFunc,
) {
// if updatefunc is not specified, use addFunc
if updateFunc == nil {
updateFunc = func(_, obj interface{}) {
addFunc(kind, obj)
observability.RecordOtherUpdated()
}
}
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cacheFunc(kind, obj)
addFunc(kind, obj)
observability.RecordOtherAdded()
},
UpdateFunc: func(_, obj interface{}) {
cacheFunc(kind, obj)
UpdateFunc: func(oldObj, newObj interface{}) {
updateFunc(oldObj, newObj)
observability.RecordOtherUpdated()
},
DeleteFunc: op.deferredDelete(func(obj any) {
Expand Down Expand Up @@ -435,10 +451,8 @@ func (op *OwnerCache) addServiceToPod(pod string, serviceName string) {
return
}

for _, it := range services {
if it == serviceName {
return
}
if idx := slices.Index(services, serviceName); idx >= 0 {
return
}

services = append(services, serviceName)
Expand All @@ -455,24 +469,9 @@ func (op *OwnerCache) deleteServiceFromPod(pod string, serviceName string) {
return
}

for i := 0; len(services) > 0; {
service := services[i]
if service == serviceName {
// Remove the ith entry by...
l := len(services)
last := services[l-1]
// ...moving it at the very end (swapping it with the last entry)...
services[l-1], services[i] = service, last
// ... and by truncating the slice by one elem
services = services[:l-1]
} else {
i++
}

if i == len(services)-1 {
break
}
}
services = slices.DeleteFunc(services, func(s string) bool {
return s == serviceName
})

if len(services) == 0 {
delete(op.podServices, pod)
Expand Down Expand Up @@ -505,12 +504,43 @@ func (op *OwnerCache) genericEndpointSliceOp(obj interface{}, serviceFunc func(p
return
}

epLabels := endpointSlice.GetLabels()
serviceName := epLabels[endpointSliceServiceLabel] // see: https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership
serviceName := getServiceName(endpointSlice)

for _, endpoint := range endpointSlice.Endpoints {
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" {
serviceFunc(endpoint.TargetRef.Name, serviceName)
podName := endpoint.TargetRef.Name
serviceFunc(podName, serviceName)
}
}
}

func (op *OwnerCache) updateEndpointSlice(oldObj interface{}, newObj interface{}) {
// for updates, we're guaranteed the objects will be the right type
oldEndpointSlice := oldObj.(*discovery_v1.EndpointSlice)
newEndpointSlice := newObj.(*discovery_v1.EndpointSlice)

// add the new endpointslice first, the logic is the same
op.cacheEndpointSlice("EndpointSlice", newObj)

// we also need to remove the Service from Pods which were deleted from the endpointslice
serviceName := getServiceName(newEndpointSlice)

newPodNames := []string{}
for _, endpoint := range newEndpointSlice.Endpoints {
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" {
podName := endpoint.TargetRef.Name
newPodNames = append(newPodNames, podName)
}
}

// for each Pod name which was in the old slice, but not in the new slice, schedule a delete
for _, endpoint := range oldEndpointSlice.Endpoints {
if endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod" {
podName := endpoint.TargetRef.Name
if slices.Index(newPodNames, podName) == -1 {
// not a deferred delete, as this is a dynamic property which can change often
op.deleteServiceFromPod(podName, serviceName)
}
}
}
}
Expand Down Expand Up @@ -652,3 +682,11 @@ func removeUnnecessaryEndpointSliceData(endpointSlice *discovery_v1.EndpointSlic

return &transformedEndpointSlice
}

// Get the Service name from an EndpointSlice based on a standard label.
// see: https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#ownership
func getServiceName(endpointSlice *discovery_v1.EndpointSlice) string {
epLabels := endpointSlice.GetLabels()
serviceName := epLabels[endpointSliceServiceLabel]
return serviceName
}
19 changes: 15 additions & 4 deletions pkg/processor/k8sprocessor/kube/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func waitForWatchToBeEstablished(client *fake.Clientset, resource string) <-chan
gvr := action.GetResource()
ns := action.GetNamespace()

watch, err := client.Tracker().Watch(gvr, ns)
watcher, err := client.Tracker().Watch(gvr, ns)
if err != nil {
return false, nil, err
}

if action.GetVerb() == "watch" {
close(ch)
}
return true, watch, nil
return true, watcher, nil
})
return ch
}
Expand Down Expand Up @@ -569,7 +569,6 @@ func Test_OwnerProvider_GetServices(t *testing.T) {
})

t.Run("updating endpoints", func(t *testing.T) {
t.Skip("Known bug, see https://github.com/SumoLogic/sumologic-otel-collector/issues/1414")
_, err = c.DiscoveryV1().EndpointSlices(namespace).
Update(context.Background(), endpointSlice2Updated, metav1.UpdateOptions{})
require.NoError(t, err)
Expand All @@ -579,8 +578,20 @@ func Test_OwnerProvider_GetServices(t *testing.T) {
t.Logf("services: %v", services)
return false
}
return assert.Equal(t, []string{"my-service"}, services)
}, 5*time.Second, 10*time.Millisecond)

return len(services) == 1
// update back to the original value
_, err = c.DiscoveryV1().EndpointSlices(namespace).
Update(context.Background(), endpointSlice2, metav1.UpdateOptions{})
require.NoError(t, err)
assert.Eventually(t, func() bool {
services := op.GetServices(pod.Name)
if len(services) != 2 {
t.Logf("services: %v", services)
return false
}
return assert.Equal(t, []string{"my-service", "my-service-2"}, services)
}, 5*time.Second, 10*time.Millisecond)
})

Expand Down

0 comments on commit 8c7502f

Please sign in to comment.