Skip to content

Commit

Permalink
Add validation webhook for KafkaUser
Browse files Browse the repository at this point in the history
  • Loading branch information
panyuenlau committed May 11, 2023
1 parent 7fd0c80 commit fc33950
Show file tree
Hide file tree
Showing 11 changed files with 403 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ webhooks:
resources:
- kafkaclusters
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
caBundle: {{ $caCrt }}
service:
name: "{{ include "kafka-operator.fullname" . }}-operator"
namespace: {{ .Release.Namespace }}
path: /validate-kafka-banzaicloud-io-v1alpha1-kafkauser
failurePolicy: Fail
name: kafkausers.kafka.banzaicloud.io
rules:
- apiGroups:
- kafka.banzaicloud.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- kafkausers
sideEffects: None
---
apiVersion: v1
kind: Secret
Expand Down
12 changes: 0 additions & 12 deletions controllers/controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
"github.com/banzaicloud/koperator/pkg/kafkaclient"
Expand Down Expand Up @@ -62,17 +61,6 @@ func reconciled() (ctrl.Result, error) {
return ctrl.Result{}, nil
}

// getClusterRefNamespace returns the expected namespace for a kafka cluster
// referenced by a user/topic CR. It takes the namespace of the CR as the first
// argument and the reference itself as the second.
func getClusterRefNamespace(ns string, ref v1alpha1.ClusterReference) string {
clusterNamespace := ref.Namespace
if clusterNamespace == "" {
return ns
}
return clusterNamespace
}

// clusterLabelString returns the label value for a cluster reference
func clusterLabelString(cluster *v1beta1.KafkaCluster) string {
return fmt.Sprintf("%s.%s", cluster.Name, cluster.Namespace)
Expand Down
15 changes: 0 additions & 15 deletions controllers/controller_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
"github.com/banzaicloud/koperator/pkg/kafkaclient"
Expand Down Expand Up @@ -68,20 +67,6 @@ func TestReconciled(t *testing.T) {
}
}

func TestGetClusterRefNamespace(t *testing.T) {
ns := testNamespace
ref := v1alpha1.ClusterReference{
Name: "test-cluster",
}
if refNS := getClusterRefNamespace(ns, ref); refNS != testNamespace {
t.Error("Expected to get 'test-namespace', got:", refNS)
}
ref.Namespace = "another-namespace"
if refNS := getClusterRefNamespace(ns, ref); refNS != "another-namespace" {
t.Error("Expected to get 'another-namespace', got:", refNS)
}
}

func TestClusterLabelString(t *testing.T) {
cluster := &v1beta1.KafkaCluster{}
cluster.Name = "test-cluster"
Expand Down
2 changes: 1 addition & 1 deletion controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (r *KafkaTopicReconciler) Reconcile(ctx context.Context, request reconcile.
}

// Get the referenced kafkacluster
clusterNamespace := getClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
clusterNamespace := util.GetClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
var cluster *v1beta1.KafkaCluster
if cluster, err = k8sutil.LookupKafkaCluster(ctx, r.Client, instance.Spec.ClusterRef.Name, clusterNamespace); err != nil {
// This shouldn't trigger anymore, but leaving it here as a safetybelt
Expand Down
7 changes: 1 addition & 6 deletions controllers/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R
}

// Get the referenced kafkacluster
clusterNamespace := getClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
clusterNamespace := util.GetClusterRefNamespace(instance.Namespace, instance.Spec.ClusterRef)
var cluster *v1beta1.KafkaCluster
if cluster, err = k8sutil.LookupKafkaCluster(ctx, r.Client, instance.Spec.ClusterRef.Name, clusterNamespace); err != nil {
// This shouldn't trigger anymore, but leaving it here as a safetybelt
Expand All @@ -196,11 +196,6 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R
var kafkaUser string

if instance.Spec.GetIfCertShouldBeCreated() {
// Avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode
// TODO: refactor this and use webhook to validate if the cluster is eligible to create a kafka user
if cluster.Spec.ListenersConfig.SSLSecrets == nil && instance.Spec.PKIBackendSpec == nil {
return requeueWithError(reqLogger, "could not create kafka user since user specific PKI not configured", errors.New("failed to create kafka user"))
}
var backend v1beta1.PKIBackend
if instance.Spec.PKIBackendSpec != nil {
backend = v1beta1.PKIBackend(instance.Spec.PKIBackendSpec.PKIBackend)
Expand Down
12 changes: 12 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func main() {
setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaCluster")
os.Exit(1)
}

err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaTopic{}).
WithValidator(webhooks.KafkaTopicValidator{
Client: mgr.GetClient(),
Expand All @@ -237,6 +238,17 @@ func main() {
setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaTopic")
os.Exit(1)
}

err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaUser{}).
WithValidator(webhooks.KafkaUserValidator{
Client: mgr.GetClient(),
Log: mgr.GetLogger().WithName("webhooks").WithName("KafkaUser"),
}).
Complete()
if err != nil {
setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaUser")
os.Exit(1)
}
}

// +kubebuilder:scaffold:builder
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,14 @@ func RetryOnError(backoff wait.Backoff, fn func() error, isRetryableError func(e
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
return RetryOnError(backoff, fn, apierrors.IsConflict)
}

// GetClusterRefNamespace returns the expected namespace for a kafka cluster
// referenced by a user/topic CR. It takes the namespace of the CR as the first
// argument and the reference itself as the second.
func GetClusterRefNamespace(ns string, ref v1alpha1.ClusterReference) string {
clusterNamespace := ref.Namespace
if clusterNamespace == "" {
return ns
}
return clusterNamespace
}
15 changes: 15 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"reflect"
"testing"

"github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/util/istioingress"

Expand Down Expand Up @@ -686,3 +687,17 @@ cruise.control.metrics.reporter.kubernetes.mode=true`,
}
}
}

func TestGetClusterRefNamespace(t *testing.T) {
const testNamespace = "test-namespace"
ref := v1alpha1.ClusterReference{
Name: "test-cluster",
}
if refNS := GetClusterRefNamespace(testNamespace, ref); refNS != testNamespace {
t.Error("Expected to get 'test-namespace', got:", refNS)
}
ref.Namespace = "another-namespace"
if refNS := GetClusterRefNamespace(testNamespace, ref); refNS != "another-namespace" {
t.Error("Expected to get 'another-namespace', got:", refNS)
}
}
6 changes: 3 additions & 3 deletions pkg/webhooks/kafkatopic_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,20 @@ func (s *KafkaTopicValidator) validateKafkaTopic(ctx context.Context, log logr.L
log.Info("Deleted as a result of a cluster deletion")
return nil, nil
}
logMsg = fmt.Sprintf("kafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace)
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' does not exist", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef").Child("name"), clusterName, logMsg))
// retrun is needed here because later this cluster is used for further checks but it is nil
return allErrs, nil
}
if k8sutil.IsMarkedForDeletion(cluster.ObjectMeta) {
// Let this through, it's a delete topic request from a parent cluster being deleted
log.Info("Cluster is going down for deletion, assuming a delete topic request")
log.Info("Referenced KafkaCluster CR is being deleted, assuming a delete topic request")
return nil, nil
}

if util.ObjectManagedByClusterRegistry(cluster) {
// referencing remote Kafka clusters is not allowed
logMsg = fmt.Sprintf("kafkaCluster '%s' in the namespace '%s' is a remote kafka cluster", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace)
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' is a remote resource", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef").Child("name"), clusterName, logMsg))
}

Expand Down
117 changes: 117 additions & 0 deletions pkg/webhooks/kafkauser_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package webhooks

import (
"context"
"fmt"

"emperror.dev/errors"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/client"

banzaicloudv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1"
"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/k8sutil"
"github.com/banzaicloud/koperator/pkg/util"
)

type KafkaUserValidator struct {
Client client.Client
Log logr.Logger
}

// ValidateCreate validates if the user-provided KafkaUser specifications valid for creation
func (validator KafkaUserValidator) ValidateCreate(ctx context.Context, obj runtime.Object) error {
return validator.validate(ctx, obj)
}

// ValidateUpdate validates if the user-provided KafkaUser specifications valid for update
func (validator KafkaUserValidator) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) error {
return validator.validate(ctx, newObj)
}

// ValidateDeleet validates if the user-selctd KafkaUser specifications valid for deletion
func (validator KafkaUserValidator) ValidateDelete(_ context.Context, _ runtime.Object) error {
// nothing to validate on deletion
return nil
}

func (validator KafkaUserValidator) validate(ctx context.Context, obj runtime.Object) error {
var (
kafkaUser = obj.(*banzaicloudv1alpha1.KafkaUser)
log = validator.Log.WithValues("name", kafkaUser.GetName(), "namespace", kafkaUser.GetNamespace())
)

fieldErrs, err := validator.validateKafkaUser(ctx, kafkaUser)
if err != nil {
log.Error(err, errorDuringValidationMsg)
return apierrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg))
}

if len(fieldErrs) == 0 {
return nil
}

log.Info("rejected", "invalid field(s)", fieldErrs.ToAggregate().Error())

return apierrors.NewInvalid(kafkaUser.GetObjectKind().GroupVersionKind().GroupKind(), kafkaUser.Name, fieldErrs)
}

func (validator KafkaUserValidator) validateKafkaUser(ctx context.Context, kafkaUser *banzaicloudv1alpha1.KafkaUser) (field.ErrorList, error) {
var (
err error
allErrs field.ErrorList
logMsg string

kafkaCluster *v1beta1.KafkaCluster
clusterNamespace = util.GetClusterRefNamespace(kafkaUser.GetNamespace(), kafkaUser.Spec.ClusterRef)
clusterName = kafkaUser.Spec.ClusterRef.Name
)

if kafkaCluster, err = k8sutil.LookupKafkaCluster(ctx, validator.Client, clusterName, clusterNamespace); err != nil {
if apierrors.IsNotFound(err) {
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' does not exist", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef"), kafkaUser.Spec.ClusterRef, logMsg))
return allErrs, nil
}

return nil, err
}

if k8sutil.IsMarkedForDeletion(kafkaCluster.ObjectMeta) {
return nil, errors.New("referenced KafkaCluster CR is being deleted")
}

if util.ObjectManagedByClusterRegistry(kafkaCluster) {
// referencing remote Kafka clusters is not allowed
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in the namespace '%s' is a remote resource", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("clusterRef"), kafkaUser.Spec.ClusterRef, logMsg))
}

if kafkaUser.Spec.GetIfCertShouldBeCreated() {
// avoid panic if the user wants to create a kafka user but the cluster is in plaintext mode
if kafkaCluster.Spec.ListenersConfig.SSLSecrets == nil && kafkaUser.Spec.PKIBackendSpec == nil {
logMsg = fmt.Sprintf("KafkaCluster CR '%s' in namespace '%s' is in plaintext mode, "+
"therefore 'spec.pkiBackendSpec' must be provided to create certificate", clusterName, clusterNamespace)
allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("pkiBackendSpec"), kafkaUser.Spec.PKIBackendSpec, logMsg))
}
}

return allErrs, nil
}
Loading

0 comments on commit fc33950

Please sign in to comment.