Skip to content

Commit

Permalink
[k8sobserver] enable the observation of ingress objects (#35539)
Browse files Browse the repository at this point in the history
**Description:** This PR fixes the observation of ingress objects by the
k8sobserver. The issue was that the `observe_ingresses` option was not
considered in the extension setup, and no informer for the ingress
resource type has been created

**Link to tracking Issue:** #35324 

**Testing:** Added unit tests. Also did manual testing against a k8s
cluster to check whether ingress objects are picked up as expected

**Documentation:** No change there, as the `observe_ingresses` option
was already documented in the readme

---------

Signed-off-by: Florian Bacher <[email protected]>
Co-authored-by: Moritz Wiesinger <[email protected]>
  • Loading branch information
bacherfl and mowies authored Nov 3, 2024
1 parent 8dd2b7b commit 908ccf2
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8s-observer-ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobserver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable observation of ingress objects if the `ObserveIngresses` config option is set to true

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35324]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
18 changes: 18 additions & 0 deletions extension/observer/k8sobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"

Expand All @@ -29,6 +30,7 @@ type k8sObserver struct {
podListerWatcher cache.ListerWatcher
serviceListerWatcher cache.ListerWatcher
nodeListerWatcher cache.ListerWatcher
ingressListerWatcher cache.ListerWatcher
handler *handler
once *sync.Once
stop chan struct{}
Expand Down Expand Up @@ -69,6 +71,14 @@ func (k *k8sObserver) Start(_ context.Context, _ component.Host) error {
k.telemetry.Logger.Error("error adding event handler to node informer", zap.Error(err))
}
}
if k.ingressListerWatcher != nil {
k.telemetry.Logger.Debug("creating and starting ingress informer")
ingressInformer := cache.NewSharedInformer(k.ingressListerWatcher, &networkingv1.Ingress{}, 0)
go ingressInformer.Run(k.stop)
if _, err := ingressInformer.AddEventHandler(k.handler); err != nil {
k.telemetry.Logger.Error("error adding event handler to ingress informer", zap.Error(err))
}
}
})
return nil
}
Expand Down Expand Up @@ -117,13 +127,21 @@ func newObserver(config *Config, set extension.Settings) (extension.Extension, e
set.Logger.Debug("observing nodes")
nodeListerWatcher = cache.NewListWatchFromClient(restClient, "nodes", v1.NamespaceAll, nodeSelector)
}

var ingressListerWatcher cache.ListerWatcher
if config.ObserveIngresses {
var ingressSelector = fields.Everything()
set.Logger.Debug("observing ingresses")
ingressListerWatcher = cache.NewListWatchFromClient(client.NetworkingV1().RESTClient(), "ingresses", v1.NamespaceAll, ingressSelector)
}
h := &handler{idNamespace: set.ID.String(), endpoints: &sync.Map{}, logger: set.TelemetrySettings.Logger}
obs := &k8sObserver{
EndpointsWatcher: observer.NewEndpointsWatcher(h, time.Second, set.TelemetrySettings.Logger),
telemetry: set.TelemetrySettings,
podListerWatcher: podListerWatcher,
serviceListerWatcher: serviceListerWatcher,
nodeListerWatcher: nodeListerWatcher,
ingressListerWatcher: ingressListerWatcher,
stop: make(chan struct{}),
config: config,
handler: h,
Expand Down
85 changes: 85 additions & 0 deletions extension/observer/k8sobserver/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,88 @@ func TestExtensionObserveNodes(t *testing.T) {
require.NoError(t, ext.Shutdown(context.Background()))
obs.StopListAndWatch()
}

func TestExtensionObserveIngresses(t *testing.T) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.ObservePods = false // avoid causing data race when multiple test cases running in the same process using podListerWatcher
config.ObserveIngresses = true
mockServiceHost(t, config)

set := extensiontest.NewNopSettings()
set.ID = component.NewID(metadata.Type)
ext, err := newObserver(config, set)
require.NoError(t, err)
require.NotNil(t, ext)

obs := ext.(*k8sObserver)
ingressListerWatcher := framework.NewFakeControllerSource()
obs.ingressListerWatcher = ingressListerWatcher

ingressListerWatcher.Add(ingress)

require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))

sink := &endpointSink{}
obs.ListAndWatch(sink)

requireSink(t, sink, func() bool {
return len(sink.added) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/ingress-1-UID/host-1/",
Target: "https://host-1/",
Details: &observer.K8sIngress{
Name: "application-ingress",
UID: "k8s_observer/ingress-1-UID/host-1/",
Labels: map[string]string{"env": "prod"},
Namespace: "default",
Scheme: "https",
Host: "host-1",
Path: "/",
},
}, sink.added[0])

ingressListerWatcher.Modify(ingressV2)

requireSink(t, sink, func() bool {
return len(sink.changed) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/ingress-1-UID/host-1/",
Target: "https://host-1/",
Details: &observer.K8sIngress{
Name: "application-ingress",
UID: "k8s_observer/ingress-1-UID/host-1/",
Labels: map[string]string{"env": "hardening"},
Namespace: "default",
Scheme: "https",
Host: "host-1",
Path: "/",
},
}, sink.changed[0])

ingressListerWatcher.Delete(ingressV2)

requireSink(t, sink, func() bool {
return len(sink.removed) == 1
})

assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/ingress-1-UID/host-1/",
Target: "https://host-1/",
Details: &observer.K8sIngress{
Name: "application-ingress",
UID: "k8s_observer/ingress-1-UID/host-1/",
Labels: map[string]string{"env": "hardening"},
Namespace: "default",
Scheme: "https",
Host: "host-1",
Path: "/",
},
}, sink.removed[0])

require.NoError(t, ext.Shutdown(context.Background()))
}
6 changes: 6 additions & 0 deletions extension/observer/k8sobserver/k8s_fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ var ingress = &networkingv1.Ingress{
},
}

var ingressV2 = func() *networkingv1.Ingress {
i2 := ingress.DeepCopy()
i2.Labels["env"] = "hardening"
return i2
}()

var ingressMultipleHost = &networkingv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Expand Down

0 comments on commit 908ccf2

Please sign in to comment.