Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation webhook for KafkaUser #968

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ webhooks:
resources:
- kafkaclusters
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
caBundle: {{ $caCrt }}
service:
name: "{{ include "kafka-operator.fullname" . }}-operator"
namespace: {{ .Release.Namespace }}
path: /validate-kafka-banzaicloud-io-v1alpha1-kafkauser
failurePolicy: Fail
name: kafkausers.kafka.banzaicloud.io
rules:
- apiGroups:
- kafka.banzaicloud.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- kafkausers
sideEffects: None
---
apiVersion: v1
kind: Secret
Expand Down
12 changes: 0 additions & 12 deletions controllers/controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
"github.com/banzaicloud/koperator/pkg/kafkaclient"
Expand Down Expand Up @@ -62,17 +61,6 @@ func reconciled() (ctrl.Result, error) {
return ctrl.Result{}, nil
}

// getClusterRefNamespace returns the expected namespace for a kafka cluster
// referenced by a user/topic CR. It takes the namespace of the CR as the first
// argument and the reference itself as the second.
func getClusterRefNamespace(ns string, ref v1alpha1.ClusterReference) string {
clusterNamespace := ref.Namespace
if clusterNamespace == "" {
return ns
}
return clusterNamespace
}

// clusterLabelString returns the label value for a cluster reference
func clusterLabelString(cluster *v1beta1.KafkaCluster) string {
return fmt.Sprintf("%s.%s", cluster.Name, cluster.Namespace)
Expand Down
15 changes: 0 additions & 15 deletions controllers/controller_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
"github.com/banzaicloud/koperator/pkg/kafkaclient"
Expand Down Expand Up @@ -68,20 +67,6 @@ func TestReconciled(t *testing.T) {
}
}

func TestGetClusterRefNamespace(t *testing.T) {
ns := testNamespace
ref := v1alpha1.ClusterReference{
Name: "test-cluster",
}
if refNS := getClusterRefNamespace(ns, ref); refNS != testNamespace {
t.Error("Expected to get 'test-namespace', got:", refNS)
}
ref.Namespace = "another-namespace"
if refNS := getClusterRefNamespace(ns, ref); refNS != "another-namespace" {
t.Error("Expected to get 'another-namespace', got:", refNS)
}
}

func TestClusterLabelString(t *testing.T) {
cluster := &v1beta1.KafkaCluster{}
cluster.Name = "test-cluster"
Expand Down
2 changes: 1 addition & 1 deletion controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, request reconcile.
}

// Get the referenced kafkacluster
clusterNamespace := getClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
clusterNamespace := util.GetClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
var cluster *v1beta1.KafkaCluster
if cluster, err = k8sutil.LookupKafkaCluster(ctx, r.Client, instance.Spec.ClusterRef.Name, clusterNamespace); err != nil {
// This shouldn't trigger anymore, but leaving it here as a safetybelt
Expand Down
6 changes: 4 additions & 2 deletions controllers/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R
}

// Get the referenced kafkacluster
clusterNamespace := getClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
clusterNamespace := util.GetClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
var cluster *v1beta1.KafkaCluster
if cluster, err = k8sutil.LookupKafkaCluster(ctx, r.Client, instance.Spec.ClusterRef.Name, clusterNamespace); err != nil {
// This shouldn't trigger anymore, but leaving it here as a safetybelt
Expand All @@ -197,10 +197,12 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R

if instance.Spec.GetIfCertShouldBeCreated() {
// Avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode
// TODO: refactor this and use webhook to validate if the cluster is eligible to create a kafka user
if cluster.Spec.ListenersConfig.SSLSecrets == nil && instance.Spec.PKIBackendSpec == nil {
// normally we should never see this scenario due to the KafkaUser validation webhook
// the only edge case is when cluster.Spec.ListenersConfig.SSLSecrets is set to nil at the beginning of this Reconcile flow
return requeueWithError(reqLogger, "could not create kafka user since user specific PKI not configured", errors.New("failed to create kafka user"))
}

var backend v1beta1.PKIBackend
if instance.Spec.PKIBackendSpec != nil {
backend = v1beta1.PKIBackend(instance.Spec.PKIBackendSpec.PKIBackend)
Expand Down
12 changes: 12 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func main() {
setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaCluster")
os.Exit(1)
}

err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaTopic{}).
WithValidator(webhooks.KafkaTopicValidator{
Client: mgr.GetClient(),
Expand All @@ -237,6 +238,17 @@ func main() {
setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaTopic")
os.Exit(1)
}

err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaUser{}).
WithValidator(webhooks.KafkaUserValidator{
Client: mgr.GetClient(),
Log: mgr.GetLogger().WithName("webhooks").WithName("KafkaUser"),
}).
Complete()
if err != nil {
setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaUser")
os.Exit(1)
}
}

// +kubebuilder:scaffold:builder
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,14 @@ func RetryOnError(backoff wait.Backoff, fn func() error, isRetryableError func(e
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
return RetryOnError(backoff, fn, apierrors.IsConflict)
}

// GetClusterRefNamespace returns the expected namespace for a kafka cluster
// referenced by a user/topic CR. It takes the namespace of the CR as the first
// argument and the reference itself as the second.
func GetClusterRefNamespace(ns string, ref v1alpha1.ClusterReference) string {
clusterNamespace := ref.Namespace
if clusterNamespace == "" {
return ns
}
return clusterNamespace
}
15 changes: 15 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"reflect"
"testing"

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/util/istioingress"

Expand Down Expand Up @@ -686,3 +687,17 @@ cruise.control.metrics.reporter.kubernetes.mode=true`,
}
}
}

func TestGetClusterRefNamespace(t *testing.T) {
const testNamespace = "test-namespace"
ref := v1alpha1.ClusterReference{
Name: "test-cluster",
}
if refNS := GetClusterRefNamespace(testNamespace, ref); refNS != testNamespace {
t.Error("Expected to get 'test-namespace', got:", refNS)
}
ref.Namespace = "another-namespace"
if refNS := GetClusterRefNamespace(testNamespace, ref); refNS != "another-namespace" {
t.Error("Expected to get 'another-namespace', got:", refNS)
}
}
6 changes: 3 additions & 3 deletions pkg/webhooks/kafkatopic_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,20 @@ func (s *KafkaTopicValidator) validateKafkaTopic(ctx context.Context, log logr.L
log.Info("Deleted as a result of a cluster deletion")
return nil, nil
}
logMsg = fmt.Sprintf("kafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace)
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' does not exist", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef").Child("name"), clusterName, logMsg))
// retrun is needed here because later this cluster is used for further checks but it is nil
return allErrs, nil
}
if k8sutil.IsMarkedForDeletion(cluster.ObjectMeta) {
// Let this through, it's a delete topic request from a parent cluster being deleted
log.Info("Cluster is going down for deletion, assuming a delete topic request")
log.Info("Referenced KafkaCluster CR is being deleted, assuming a delete topic request")
return nil, nil
}

if util.ObjectManagedByClusterRegistry(cluster) {
// referencing remote Kafka clusters is not allowed
logMsg = fmt.Sprintf("kafkaCluster '%s' in the namespace '%s' is a remote kafka cluster", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace)
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' is a remote resource", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef").Child("name"), clusterName, logMsg))
}

Expand Down
117 changes: 117 additions & 0 deletions pkg/webhooks/kafkauser_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webhooks

import (
"context"
"fmt"

"emperror.dev/errors"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/client"

banzaicloudv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/k8sutil"
"github.com/banzaicloud/koperator/pkg/util"
)

type KafkaUserValidator struct {
Client client.Client
Log logr.Logger
}

// ValidateCreate validates if the user-provided KafkaUser specifications valid for creation
func (validator KafkaUserValidator) ValidateCreate(ctx context.Context, obj runtime.Object) error {
return validator.validate(ctx, obj)
}

// ValidateUpdate validates if the user-provided KafkaUser specifications valid for update
func (validator KafkaUserValidator) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) error {
return validator.validate(ctx, newObj)
}

// ValidateDeleet validates if the user-selctd KafkaUser specifications valid for deletion
func (validator KafkaUserValidator) ValidateDelete(_ context.Context, _ runtime.Object) error {
// nothing to validate on deletion
return nil
}

func (validator KafkaUserValidator) validate(ctx context.Context, obj runtime.Object) error {
var (
kafkaUser = obj.(*banzaicloudv1alpha1.KafkaUser)
log = validator.Log.WithValues("name", kafkaUser.GetName(), "namespace", kafkaUser.GetNamespace())
)

fieldErrs, err := validator.validateKafkaUser(ctx, kafkaUser)
if err != nil {
log.Error(err, errorDuringValidationMsg)
return apierrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg))
}

if len(fieldErrs) == 0 {
return nil
}

log.Info("rejected", "invalid field(s)", fieldErrs.ToAggregate().Error())

return apierrors.NewInvalid(kafkaUser.GetObjectKind().GroupVersionKind().GroupKind(), kafkaUser.Name, fieldErrs)
}

func (validator KafkaUserValidator) validateKafkaUser(ctx context.Context, kafkaUser *banzaicloudv1alpha1.KafkaUser) (field.ErrorList, error) {
var (
err error
allErrs field.ErrorList
logMsg string

kafkaCluster *v1beta1.KafkaCluster
clusterNamespace = util.GetClusterRefNamespace(kafkaUser.GetNamespace(), kafkaUser.Spec.ClusterRef)
clusterName = kafkaUser.Spec.ClusterRef.Name
)

if kafkaCluster, err = k8sutil.LookupKafkaCluster(ctx, validator.Client, clusterName, clusterNamespace); err != nil {
if apierrors.IsNotFound(err) {
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' does not exist", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef"), kafkaUser.Spec.ClusterRef, logMsg))
return allErrs, nil
}

return nil, err
}

if k8sutil.IsMarkedForDeletion(kafkaCluster.ObjectMeta) {
return nil, errors.New("referenced KafkaCluster CR is being deleted")
}

if util.ObjectManagedByClusterRegistry(kafkaCluster) {
// referencing remote Kafka clusters is not allowed
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' is a remote resource", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef"), kafkaUser.Spec.ClusterRef, logMsg))
}
Comment on lines +101 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Won't this cause issues with clreg replicated KafkaUsers from remote K8s clusters and Kafka clusters?
I didn't see any guard against validating KafkaUsers replicated by clreg.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm yea you are right, we probably shouldn't have this restriction here


if kafkaUser.Spec.GetIfCertShouldBeCreated() {
// avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode
if kafkaCluster.Spec.ListenersConfig.SSLSecrets == nil && kafkaUser.Spec.PKIBackendSpec == nil {
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in namespace '%s' is in plaintext mode, "+
"therefore 'spec.pkiBackendSpec' must be provided to create certificate", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("pkiBackendSpec"), kafkaUser.Spec.PKIBackendSpec, logMsg))
}
}
Comment on lines +108 to +114
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to myself: This check is not validate against a Kafka cluster within a Istio mesh since the broker certs are issued by Istio, not Koperator


return allErrs, nil
}
Loading