From fc33950dbd808426286906f3288050095e77a9f5 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Wed, 10 May 2023 21:53:26 -0400 Subject: [PATCH 1/3] Add validation webhook for KafkaUser --- .../operator-deployment-with-webhook.yaml | 21 ++ controllers/controller_common.go | 12 - controllers/controller_common_test.go | 15 -- controllers/kafkatopic_controller.go | 2 +- controllers/kafkauser_controller.go | 7 +- main.go | 12 + pkg/util/util.go | 11 + pkg/util/util_test.go | 15 ++ pkg/webhooks/kafkatopic_validator.go | 6 +- pkg/webhooks/kafkauser_validator.go | 117 +++++++++ pkg/webhooks/kafkauser_validator_test.go | 222 ++++++++++++++++++ 11 files changed, 403 insertions(+), 37 deletions(-) create mode 100644 pkg/webhooks/kafkauser_validator.go create mode 100644 pkg/webhooks/kafkauser_validator_test.go diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index 3bb05c3bf..f37ee65fb 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -72,6 +72,27 @@ webhooks: resources: - kafkaclusters sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + caBundle: {{ $caCrt }} + service: + name: "{{ include "kafka-operator.fullname" . }}-operator" + namespace: {{ .Release.Namespace }} + path: /validate-kafka-banzaicloud-io-v1alpha1-kafkauser + failurePolicy: Fail + name: kafkausers.kafka.banzaicloud.io + rules: + - apiGroups: + - kafka.banzaicloud.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - kafkausers + sideEffects: None --- apiVersion: v1 kind: Secret diff --git a/controllers/controller_common.go b/controllers/controller_common.go index c86d8810b..acc56f7af 100644 --- a/controllers/controller_common.go +++ b/controllers/controller_common.go @@ -27,7 +27,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" "github.com/banzaicloud/koperator/pkg/kafkaclient" @@ -62,17 +61,6 @@ func reconciled() (ctrl.Result, error) { return ctrl.Result{}, nil } -// getClusterRefNamespace returns the expected namespace for a kafka cluster -// referenced by a user/topic CR. It takes the namespace of the CR as the first -// argument and the reference itself as the second. -func getClusterRefNamespace(ns string, ref v1alpha1.ClusterReference) string { - clusterNamespace := ref.Namespace - if clusterNamespace == "" { - return ns - } - return clusterNamespace -} - // clusterLabelString returns the label value for a cluster reference func clusterLabelString(cluster *v1beta1.KafkaCluster) string { return fmt.Sprintf("%s.%s", cluster.Name, cluster.Namespace) diff --git a/controllers/controller_common_test.go b/controllers/controller_common_test.go index 729751feb..9ad0b58ad 100644 --- a/controllers/controller_common_test.go +++ b/controllers/controller_common_test.go @@ -25,7 +25,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" "github.com/banzaicloud/koperator/pkg/kafkaclient" @@ -68,20 +67,6 @@ func TestReconciled(t *testing.T) { } } -func TestGetClusterRefNamespace(t *testing.T) { - ns := testNamespace - ref := v1alpha1.ClusterReference{ - Name: "test-cluster", - } - if refNS := getClusterRefNamespace(ns, ref); refNS != testNamespace { - t.Error("Expected to get 'test-namespace', got:", refNS) - } - ref.Namespace = "another-namespace" - if refNS := getClusterRefNamespace(ns, ref); refNS != "another-namespace" { - t.Error("Expected to get 'another-namespace', got:", refNS) - } -} - func TestClusterLabelString(t *testing.T) { cluster := &v1beta1.KafkaCluster{} cluster.Name = "test-cluster" diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index 496c36fbe..8cbcb50da 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -91,7 +91,7 @@ func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, request reconcile. } // Get the referenced kafkacluster - clusterNamespace := getClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef) + clusterNamespace := util.GetClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef) var cluster *v1beta1.KafkaCluster if cluster, err = k8sutil.LookupKafkaCluster(ctx, r.Client, instance.Spec.ClusterRef.Name, clusterNamespace); err != nil { // This shouldn't trigger anymore, but leaving it here as a safetybelt diff --git a/controllers/kafkauser_controller.go b/controllers/kafkauser_controller.go index ffe41e3fe..3bd0863d3 100644 --- a/controllers/kafkauser_controller.go +++ b/controllers/kafkauser_controller.go @@ -179,7 +179,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R } // Get the referenced kafkacluster - clusterNamespace := getClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef) + clusterNamespace := util.GetClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef) var cluster *v1beta1.KafkaCluster if cluster, err = k8sutil.LookupKafkaCluster(ctx, r.Client, instance.Spec.ClusterRef.Name, clusterNamespace); err != nil { // This shouldn't trigger anymore, but leaving it here as a safetybelt @@ -196,11 +196,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R var kafkaUser string if instance.Spec.GetIfCertShouldBeCreated() { - // Avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode - // TODO: refactor this and use webhook to validate if the cluster is eligible to create a kafka user - if cluster.Spec.ListenersConfig.SSLSecrets == nil && instance.Spec.PKIBackendSpec == nil { - return requeueWithError(reqLogger, "could not create kafka user since user specific PKI not configured", errors.New("failed to create kafka user")) - } var backend v1beta1.PKIBackend if instance.Spec.PKIBackendSpec != nil { backend = v1beta1.PKIBackend(instance.Spec.PKIBackendSpec.PKIBackend) diff --git a/main.go b/main.go index 8f821677f..8c764ff67 100644 --- a/main.go +++ b/main.go @@ -226,6 +226,7 @@ func main() { setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaCluster") os.Exit(1) } + err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaTopic{}). WithValidator(webhooks.KafkaTopicValidator{ Client: mgr.GetClient(), @@ -237,6 +238,17 @@ func main() { setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaTopic") os.Exit(1) } + + err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaUser{}). + WithValidator(webhooks.KafkaUserValidator{ + Client: mgr.GetClient(), + Log: mgr.GetLogger().WithName("webhooks").WithName("KafkaUser"), + }). + Complete() + if err != nil { + setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaUser") + os.Exit(1) + } } // +kubebuilder:scaffold:builder diff --git a/pkg/util/util.go b/pkg/util/util.go index 7f637ecb7..7321483c0 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -548,3 +548,14 @@ func RetryOnError(backoff wait.Backoff, fn func() error, isRetryableError func(e func RetryOnConflict(backoff wait.Backoff, fn func() error) error { return RetryOnError(backoff, fn, apierrors.IsConflict) } + +// GetClusterRefNamespace returns the expected namespace for a kafka cluster +// referenced by a user/topic CR. It takes the namespace of the CR as the first +// argument and the reference itself as the second. +func GetClusterRefNamespace(ns string, ref v1alpha1.ClusterReference) string { + clusterNamespace := ref.Namespace + if clusterNamespace == "" { + return ns + } + return clusterNamespace +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 8add02c4a..9da66f1ca 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -18,6 +18,7 @@ import ( "reflect" "testing" + "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/util/istioingress" @@ -686,3 +687,17 @@ cruise.control.metrics.reporter.kubernetes.mode=true`, } } } + +func TestGetClusterRefNamespace(t *testing.T) { + const testNamespace = "test-namespace" + ref := v1alpha1.ClusterReference{ + Name: "test-cluster", + } + if refNS := GetClusterRefNamespace(testNamespace, ref); refNS != testNamespace { + t.Error("Expected to get 'test-namespace', got:", refNS) + } + ref.Namespace = "another-namespace" + if refNS := GetClusterRefNamespace(testNamespace, ref); refNS != "another-namespace" { + t.Error("Expected to get 'another-namespace', got:", refNS) + } +} diff --git a/pkg/webhooks/kafkatopic_validator.go b/pkg/webhooks/kafkatopic_validator.go index c0c509326..eda6d0610 100644 --- a/pkg/webhooks/kafkatopic_validator.go +++ b/pkg/webhooks/kafkatopic_validator.go @@ -110,20 +110,20 @@ func (s *KafkaTopicValidator) validateKafkaTopic(ctx context.Context, log logr.L log.Info("Deleted as a result of a cluster deletion") return nil, nil } - logMsg = fmt.Sprintf("kafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace) + logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' does not exist", clusterName, clusterNamespace) allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef").Child("name"), clusterName, logMsg)) // retrun is needed here because later this cluster is used for further checks but it is nil return allErrs, nil } if k8sutil.IsMarkedForDeletion(cluster.ObjectMeta) { // Let this through, it's a delete topic request from a parent cluster being deleted - log.Info("Cluster is going down for deletion, assuming a delete topic request") + log.Info("Referenced KafkaCluster CR is being deleted, assuming a delete topic request") return nil, nil } if util.ObjectManagedByClusterRegistry(cluster) { // referencing remote Kafka clusters is not allowed - logMsg = fmt.Sprintf("kafkaCluster '%s' in the namespace '%s' is a remote kafka cluster", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace) + logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' is a remote resource", clusterName, clusterNamespace) allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef").Child("name"), clusterName, logMsg)) } diff --git a/pkg/webhooks/kafkauser_validator.go b/pkg/webhooks/kafkauser_validator.go new file mode 100644 index 000000000..a2c821f51 --- /dev/null +++ b/pkg/webhooks/kafkauser_validator.go @@ -0,0 +1,117 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhooks + +import ( + "context" + "fmt" + + "emperror.dev/errors" + "github.com/go-logr/logr" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" + "sigs.k8s.io/controller-runtime/pkg/client" + + banzaicloudv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1" + "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/k8sutil" + "github.com/banzaicloud/koperator/pkg/util" +) + +type KafkaUserValidator struct { + Client client.Client + Log logr.Logger +} + +// ValidateCreate validates if the user-provided KafkaUser specifications valid for creation +func (validator KafkaUserValidator) ValidateCreate(ctx context.Context, obj runtime.Object) error { + return validator.validate(ctx, obj) +} + +// ValidateUpdate validates if the user-provided KafkaUser specifications valid for update +func (validator KafkaUserValidator) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) error { + return validator.validate(ctx, newObj) +} + +// ValidateDeleet validates if the user-selctd KafkaUser specifications valid for deletion +func (validator KafkaUserValidator) ValidateDelete(_ context.Context, _ runtime.Object) error { + // nothing to validate on deletion + return nil +} + +func (validator KafkaUserValidator) validate(ctx context.Context, obj runtime.Object) error { + var ( + kafkaUser = obj.(*banzaicloudv1alpha1.KafkaUser) + log = validator.Log.WithValues("name", kafkaUser.GetName(), "namespace", kafkaUser.GetNamespace()) + ) + + fieldErrs, err := validator.validateKafkaUser(ctx, kafkaUser) + if err != nil { + log.Error(err, errorDuringValidationMsg) + return apierrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg)) + } + + if len(fieldErrs) == 0 { + return nil + } + + log.Info("rejected", "invalid field(s)", fieldErrs.ToAggregate().Error()) + + return apierrors.NewInvalid(kafkaUser.GetObjectKind().GroupVersionKind().GroupKind(), kafkaUser.Name, fieldErrs) +} + +func (validator KafkaUserValidator) validateKafkaUser(ctx context.Context, kafkaUser *banzaicloudv1alpha1.KafkaUser) (field.ErrorList, error) { + var ( + err error + allErrs field.ErrorList + logMsg string + + kafkaCluster *v1beta1.KafkaCluster + clusterNamespace = util.GetClusterRefNamespace(kafkaUser.GetNamespace(), kafkaUser.Spec.ClusterRef) + clusterName = kafkaUser.Spec.ClusterRef.Name + ) + + if kafkaCluster, err = k8sutil.LookupKafkaCluster(ctx, validator.Client, clusterName, clusterNamespace); err != nil { + if apierrors.IsNotFound(err) { + logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' does not exist", clusterName, clusterNamespace) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef"), kafkaUser.Spec.ClusterRef, logMsg)) + return allErrs, nil + } + + return nil, err + } + + if k8sutil.IsMarkedForDeletion(kafkaCluster.ObjectMeta) { + return nil, errors.New("referenced KafkaCluster CR is being deleted") + } + + if util.ObjectManagedByClusterRegistry(kafkaCluster) { + // referencing remote Kafka clusters is not allowed + logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' is a remote resource", clusterName, clusterNamespace) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef"), kafkaUser.Spec.ClusterRef, logMsg)) + } + + if kafkaUser.Spec.GetIfCertShouldBeCreated() { + // avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode + if kafkaCluster.Spec.ListenersConfig.SSLSecrets == nil && kafkaUser.Spec.PKIBackendSpec == nil { + logMsg = fmt.Sprintf("KafkaCluster CR '%s' in namespace '%s' is in plaintext mode, "+ + "therefore 'spec.pkiBackendSpec' must be provided to create certificate", clusterName, clusterNamespace) + allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("pkiBackendSpec"), kafkaUser.Spec.PKIBackendSpec, logMsg)) + } + } + + return allErrs, nil +} diff --git a/pkg/webhooks/kafkauser_validator_test.go b/pkg/webhooks/kafkauser_validator_test.go new file mode 100644 index 000000000..37ae6ce25 --- /dev/null +++ b/pkg/webhooks/kafkauser_validator_test.go @@ -0,0 +1,222 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package webhooks + +import ( + "context" + "testing" + "time" + + "emperror.dev/errors" + clusterregv1alpha1 "github.com/cisco-open/cluster-registry-controller/api/v1alpha1" + "github.com/go-logr/logr" + "github.com/stretchr/testify/require" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" + runtimeClient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/banzaicloud/koperator/api/v1alpha1" + "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/util" +) + +func newMockClusterBeingDeleted() *v1beta1.KafkaCluster { + return &v1beta1.KafkaCluster{ + TypeMeta: metav1.TypeMeta{ + Kind: "KafkaCluster", + APIVersion: "v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-being-deleted", + Namespace: "test-namespace", + DeletionTimestamp: &metav1.Time{Time: time.Now()}, + }, + Spec: v1beta1.KafkaClusterSpec{}, + } +} + +func newMockClusterWithNoSSLSecrets() *v1beta1.KafkaCluster { + return &v1beta1.KafkaCluster{ + TypeMeta: metav1.TypeMeta{ + Kind: "KafkaCluster", + APIVersion: "v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-no-sslsecrets", + Namespace: "test-namespace", + }, + Spec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + InternalListeners: []v1beta1.InternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{ + Type: "plaintext", + Name: "test-listener", + ContainerPort: 9091, + }, + }, + }, + }, + }, + } +} + +func newMockRemoteCluster() *v1beta1.KafkaCluster { + return &v1beta1.KafkaCluster{ + TypeMeta: metav1.TypeMeta{ + Kind: "KafkaCluster", + APIVersion: "v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-remote", + Namespace: "test-namespace", + Annotations: map[string]string{clusterregv1alpha1.OwnershipAnnotation: "test-val"}, + }, + Spec: v1beta1.KafkaClusterSpec{}, + } +} + +func newMockClient() runtimeClient.Client { + scheme := runtime.NewScheme() + _ = v1beta1.AddToScheme(scheme) + _ = v1alpha1.AddToScheme(scheme) + + return fake.NewClientBuilder().WithScheme(scheme).Build() +} + +func TestValidateKafkaUser(t *testing.T) { + testCases := []struct { + testName string + kafkaUser *v1alpha1.KafkaUser + kafkaCluster *v1beta1.KafkaCluster + expectedErrors error + }{ + { + testName: "referenced KafkaCluster CR is not found - name misconfigured", + kafkaUser: &v1alpha1.KafkaUser{ + TypeMeta: metav1.TypeMeta{Kind: "KafkaUser", APIVersion: "kafka.banzaicloud.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-user", Namespace: "test-namespace"}, + Spec: v1alpha1.KafkaUserSpec{ + ClusterRef: v1alpha1.ClusterReference{ + Name: "invalid-name", + Namespace: "test-namespace", + }, + }, + }, + kafkaCluster: newMockCluster(), + expectedErrors: apierrors.NewInvalid(schema.GroupKind{Group: "kafka.banzaicloud.io", Kind: "KafkaUser"}, + "test-user", append(field.ErrorList{}, field.Invalid( + field.NewPath("spec").Child("clusterRef"), + v1alpha1.ClusterReference{Name: "invalid-name", Namespace: "test-namespace"}, + "KafkaCluster CR 'invalid-name' in the namespace 'test-namespace' does not exist"))), + }, + { + testName: "referenced KafkaCluster CR is not found - namespace misconfigured", + kafkaUser: &v1alpha1.KafkaUser{ + TypeMeta: metav1.TypeMeta{Kind: "KafkaUser", APIVersion: "kafka.banzaicloud.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-user", Namespace: "test-namespace"}, + Spec: v1alpha1.KafkaUserSpec{ + ClusterRef: v1alpha1.ClusterReference{ + Name: "test-cluster", + Namespace: "invalid-namespace", + }, + }, + }, + kafkaCluster: newMockCluster(), + expectedErrors: apierrors.NewInvalid(schema.GroupKind{Group: "kafka.banzaicloud.io", Kind: "KafkaUser"}, + "test-user", append(field.ErrorList{}, field.Invalid( + field.NewPath("spec").Child("clusterRef"), v1alpha1.ClusterReference{Name: "test-cluster", Namespace: "invalid-namespace"}, + "KafkaCluster CR 'test-cluster' in the namespace 'invalid-namespace' does not exist"))), + }, + { + testName: "referenced KafkaCluster CR is being deleted", + kafkaUser: &v1alpha1.KafkaUser{ + TypeMeta: metav1.TypeMeta{Kind: "KafkaUser", APIVersion: "kafka.banzaicloud.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-user", Namespace: "test-namespace"}, + Spec: v1alpha1.KafkaUserSpec{ + ClusterRef: v1alpha1.ClusterReference{ + Name: "test-cluster-being-deleted", + Namespace: "test-namespace", + }, + }, + }, + kafkaCluster: newMockClusterBeingDeleted(), + expectedErrors: apierrors.NewInternalError(errors.WithMessage(errors.New("referenced KafkaCluster CR is being deleted"), errorDuringValidationMsg)), + }, + { + testName: "referenced KafkaCluster CR is a remote CR", + kafkaUser: &v1alpha1.KafkaUser{ + TypeMeta: metav1.TypeMeta{Kind: "KafkaUser", APIVersion: "kafka.banzaicloud.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-user", Namespace: "test-namespace"}, + Spec: v1alpha1.KafkaUserSpec{ + ClusterRef: v1alpha1.ClusterReference{ + Name: "test-cluster-remote", + Namespace: "test-namespace", + }, + CreateCert: util.BoolPointer(false), + }, + }, + kafkaCluster: newMockRemoteCluster(), + expectedErrors: apierrors.NewInvalid(schema.GroupKind{Group: "kafka.banzaicloud.io", Kind: "KafkaUser"}, + "test-user", append(field.ErrorList{}, field.Invalid( + field.NewPath("spec").Child("clusterRef"), v1alpha1.ClusterReference{Name: "test-cluster-remote", Namespace: "test-namespace"}, + "KafkaCluster CR 'test-cluster-remote' in the namespace 'test-namespace' is a remote resource"))), + }, + { + testName: "certificate needs to be created for the KafkaUser but the corresponding KafkaCluster's" + + " listenerConfig doesn't have SSL secrets and there is no PKIBackendSpec specified under the KafkaUser", + kafkaUser: &v1alpha1.KafkaUser{ + TypeMeta: metav1.TypeMeta{Kind: "KafkaUser", APIVersion: "kafka.banzaicloud.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{Name: "test-user", Namespace: "test-namespace"}, + Spec: v1alpha1.KafkaUserSpec{ + ClusterRef: v1alpha1.ClusterReference{ + Name: "test-cluster-no-sslsecrets", + Namespace: "test-namespace", + }, + CreateCert: util.BoolPointer(true), + }, + }, + kafkaCluster: newMockClusterWithNoSSLSecrets(), + expectedErrors: apierrors.NewInvalid(schema.GroupKind{Group: "kafka.banzaicloud.io", Kind: "KafkaUser"}, + "test-user", append(field.ErrorList{}, field.Invalid( + field.NewPath("spec").Child("pkiBackendSpec"), nil, + "KafkaCluster CR 'test-cluster-no-sslsecrets' in namespace 'test-namespace' is in plaintext"+ + " mode, therefore 'spec.pkiBackendSpec' must be provided to create certificate"))), + }, + } + + ctx := context.Background() + + for _, testCase := range testCases { + test := testCase + t.Run(test.testName, func(t *testing.T) { + t.Parallel() + userValidator := KafkaUserValidator{ + Client: newMockClient(), + Log: logr.Discard(), + } + + err := userValidator.Client.Create(ctx, test.kafkaCluster) + require.Nil(t, err) + + actualErrs := userValidator.ValidateCreate(context.Background(), test.kafkaUser) + require.Equal(t, test.expectedErrors, actualErrs) + }) + } +} From adad7a00d00166cbc85dfb9ee48fae2b62ae00a7 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 11 May 2023 15:52:35 -0400 Subject: [PATCH 2/3] Add if check back to avoid hitting edge case --- controllers/kafkauser_controller.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/controllers/kafkauser_controller.go b/controllers/kafkauser_controller.go index 3bd0863d3..ab8314724 100644 --- a/controllers/kafkauser_controller.go +++ b/controllers/kafkauser_controller.go @@ -196,6 +196,13 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R var kafkaUser string if instance.Spec.GetIfCertShouldBeCreated() { + // Avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode + if cluster.Spec.ListenersConfig.SSLSecrets == nil && instance.Spec.PKIBackendSpec == nil { + // we should never see this scenario due to the KafkaUser validation webhook + // the only edge case is when cluster.Spec.ListenersConfig.SSLSecrets is set to nil during operator upgrade + return requeueWithError(reqLogger, "could not create kafka user since user specific PKI not configured", errors.New("failed to create kafka user")) + } + var backend v1beta1.PKIBackend if instance.Spec.PKIBackendSpec != nil { backend = v1beta1.PKIBackend(instance.Spec.PKIBackendSpec.PKIBackend) From 7a083eb6dabe8120bfe5284190feccc1416e5056 Mon Sep 17 00:00:00 2001 From: Darren Lau Date: Thu, 11 May 2023 16:04:47 -0400 Subject: [PATCH 3/3] Update comment --- controllers/kafkauser_controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/controllers/kafkauser_controller.go b/controllers/kafkauser_controller.go index ab8314724..494d90d50 100644 --- a/controllers/kafkauser_controller.go +++ b/controllers/kafkauser_controller.go @@ -198,8 +198,8 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R if instance.Spec.GetIfCertShouldBeCreated() { // Avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode if cluster.Spec.ListenersConfig.SSLSecrets == nil && instance.Spec.PKIBackendSpec == nil { - // we should never see this scenario due to the KafkaUser validation webhook - // the only edge case is when cluster.Spec.ListenersConfig.SSLSecrets is set to nil during operator upgrade + // normally we should never see this scenario due to the KafkaUser validation webhook + // the only edge case is when cluster.Spec.ListenersConfig.SSLSecrets is set to nil at the beginning of this Reconcile flow return requeueWithError(reqLogger, "could not create kafka user since user specific PKI not configured", errors.New("failed to create kafka user")) }