Skip to content

Commit

Permalink
Using the composite prober for the sink (#3267)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rahul-Kumar-prog authored Sep 8, 2023
1 parent a605869 commit 1e36978
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
25 changes: 21 additions & 4 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package sink

import (
"context"
"net/http"

"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
Expand Down Expand Up @@ -74,11 +74,18 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env
)
}

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
logger.Warn("failed to get CA certs when at least one address uses TLS", zap.Error(err))
}
impl := sinkreconciler.NewImpl(ctx, reconciler)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: configs.SystemNamespace, Name: configs.IngressName})
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, configs.IngressPodPort, IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)

reconciler.Prober, err = prober.NewComposite(ctx, configs.IngressPodPort, configs.IngressPodTlsPort, IPsLister, impl.EnqueueKey, &caCerts)
if err != nil {
logger.Fatal("Failed to create prober", zap.Error(err))
}
sinkInformer := sinkinformer.Get(ctx)

sinkInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand All @@ -101,6 +108,16 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env

reconciler.Tracker = impl.Tracker

rotateCACerts := func(obj interface{}) {
newCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err))
}
reconciler.Prober.RotateRootCaCerts(&newCerts)
globalResync(obj)
}

secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(
// Call the tracker's OnChanged method, but we've seen the objects
// coming through this path missing TypeMeta, so ensure it is properly
Expand All @@ -117,7 +134,7 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env

secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(sinkIngressTLSSecretName),
Handler: controller.HandleAll(globalResync),
Handler: controller.HandleAll(rotateCACerts),
})

return impl
Expand Down
16 changes: 9 additions & 7 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Reconciler struct {
// mock the function used during the reconciliation loop.
NewKafkaClusterAdminClient kafka.NewClusterAdminClientFunc

Prober prober.Prober
Prober prober.NewProber

IngressHost string
}
Expand Down Expand Up @@ -277,9 +277,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

address := addressableStatus.Address.URL.URL()
proberAddressable := prober.Addressable{
Address: address,
proberAddressable := prober.NewAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Namespace: ks.GetNamespace(),
Name: ks.GetName(),
Expand Down Expand Up @@ -349,9 +348,12 @@ func (r *Reconciler) finalizeKind(ctx context.Context, ks *eventing.KafkaSink) e
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, ks)
proberAddressable := prober.Addressable{
Address: address,
address := receiver.HTTPAddress(r.IngressHost, ks)
proberAddressable := prober.NewAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Addresses: []duckv1.Addressable{address},
},
ResourceKey: types.NamespacedName{
Namespace: ks.GetNamespace(),
Name: ks.GetName(),
Expand Down
18 changes: 9 additions & 9 deletions control-plane/pkg/reconciler/sink/kafka_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) {
},
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -1140,7 +1140,7 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) {
},
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusUnknown),
testProber: probertesting.MockNewProber(prober.StatusUnknown),
},
},
{
Expand Down Expand Up @@ -1359,7 +1359,7 @@ func sinkFinalization(t *testing.T, format string, env config.Env) {
}),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func sinkFinalization(t *testing.T, format string, env config.Env) {
}),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -1461,7 +1461,7 @@ func sinkFinalization(t *testing.T, format string, env config.Env) {
}),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -1513,7 +1513,7 @@ func sinkFinalization(t *testing.T, format string, env config.Env) {
},
WantErr: true,
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusReady),
testProber: probertesting.MockNewProber(prober.StatusReady),
},
},
{
Expand Down Expand Up @@ -1579,7 +1579,7 @@ func sinkFinalization(t *testing.T, format string, env config.Env) {
OtherTestData: map[string]interface{}{
wantErrorOnDeleteTopic: errDeleteTopic,
wantTopicName: "topic-2",
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
}
Expand Down Expand Up @@ -1645,9 +1645,9 @@ func useTable(t *testing.T, table TableTest, env *config.Env) {
errorOnDescribeTopics = isPresentError.(error)
}

proberMock := probertesting.MockProber(prober.StatusReady)
proberMock := probertesting.MockNewProber(prober.StatusReady)
if p, ok := row.OtherTestData[testProber]; ok {
proberMock = p.(prober.Prober)
proberMock = p.(prober.NewProber)
}

reconciler := &sink.Reconciler{
Expand Down

0 comments on commit 1e36978

Please sign in to comment.