Skip to content

Commit

Permalink
Fixing Delete acl functionality (#984)
Browse files Browse the repository at this point in the history
* Fixing Delete acl functionality

* Adding missing param

* incorporating review comments
  • Loading branch information
shubhamcoc authored Jun 9, 2023
1 parent 811b8f2 commit 115f380
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 16 deletions.
10 changes: 6 additions & 4 deletions controllers/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,10 @@ func (r *KafkaUserReconciler) checkFinalizers(ctx context.Context, cluster *v1be
var err error
if util.StringSliceContains(instance.GetFinalizers(), userFinalizer) {
if len(instance.Spec.TopicGrants) > 0 {
if err = r.finalizeKafkaUserACLs(reqLogger, cluster, user); err != nil {
return requeueWithError(reqLogger, "failed to finalize kafkauser", err)
for _, topicGrant := range instance.Spec.TopicGrants {
if err = r.finalizeKafkaUserACLs(reqLogger, cluster, user, topicGrant.PatternType); err != nil {
return requeueWithError(reqLogger, "failed to finalize kafkauser", err)
}
}
}
// remove finalizer
Expand All @@ -350,7 +352,7 @@ func (r *KafkaUserReconciler) removeFinalizer(ctx context.Context, user *v1alpha
return err
}

func (r *KafkaUserReconciler) finalizeKafkaUserACLs(reqLogger logr.Logger, cluster *v1beta1.KafkaCluster, user string) error {
func (r *KafkaUserReconciler) finalizeKafkaUserACLs(reqLogger logr.Logger, cluster *v1beta1.KafkaCluster, user string, patternType v1alpha1.KafkaPatternType) error {
if k8sutil.IsMarkedForDeletion(cluster.ObjectMeta) {
reqLogger.Info("Cluster is being deleted, skipping ACL deletion")
return nil
Expand All @@ -362,7 +364,7 @@ func (r *KafkaUserReconciler) finalizeKafkaUserACLs(reqLogger logr.Logger, clust
return err
}
defer close()
if err = broker.DeleteUserACLs(user); err != nil {
if err = broker.DeleteUserACLs(user, patternType); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion controllers/tests/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,5 @@ func resetMockKafkaClient(kafkaCluster *v1beta1.KafkaCluster) {
}

// delete all acls
_ = mockKafkaClient.DeleteUserACLs("")
_ = mockKafkaClient.DeleteUserACLs("", "")
}
2 changes: 1 addition & 1 deletion pkg/kafkaclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type KafkaClient interface {
DescribeTopic(string) (*sarama.TopicMetadata, error)
CreateUserACLs(v1alpha1.KafkaAccessType, v1alpha1.KafkaPatternType, string, string) error
ListUserACLs() ([]sarama.ResourceAcls, error)
DeleteUserACLs(string) error
DeleteUserACLs(string, v1alpha1.KafkaPatternType) error

Brokers() map[int32]string
DescribeCluster() ([]*sarama.Broker, int32, error)
Expand Down
16 changes: 12 additions & 4 deletions pkg/kafkaclient/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,27 @@ func (k *kafkaClient) ListUserACLs() ([]sarama.ResourceAcls, error) {
}

// DeleteUserACLs removes all ACLs for a given user
func (k *kafkaClient) DeleteUserACLs(dn string) (err error) {
func (k *kafkaClient) DeleteUserACLs(dn string, patternType v1alpha1.KafkaPatternType) error {
if patternType == "" {
patternType = v1alpha1.KafkaPatternTypeDefault
}
aclPatternType := AclPatternTypeMapping(patternType)
matches, err := k.admin.DeleteACL(sarama.AclFilter{
Principal: &dn,
Principal: &dn,
ResourcePatternTypeFilter: aclPatternType,
Operation: sarama.AclOperationDelete,
ResourceType: sarama.AclResourceTopic,
PermissionType: sarama.AclPermissionAny,
}, false)
if err != nil {
return
return err
}
for _, x := range matches {
if x.Err != sarama.ErrNoError {
return x.Err
}
}
return
return nil
}

func (k *kafkaClient) createReadACLs(dn string, topic string, patternType sarama.AclResourcePatternType) (err error) {
Expand Down
21 changes: 15 additions & 6 deletions pkg/kafkaclient/users_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,25 @@ func TestCreateUserACLs(t *testing.T) {
func TestDeleteUserACLs(t *testing.T) {
client := newOpenedMockClient()

if err := client.DeleteUserACLs("test-user"); err != nil {
t.Error("Expected no error, got:", err)
}
validPatternTypes := []v1alpha1.KafkaPatternType{
"any",
"literal",
"match",
"prefixed",
""}

if err := client.DeleteUserACLs("with-error"); err == nil {
t.Error("Expected error, got nil")
for _, patternType := range validPatternTypes {
if err := client.DeleteUserACLs("test-user", patternType); err != nil {
t.Error("Expected no error, got:", err)
}

if err := client.DeleteUserACLs("with-error", patternType); err == nil {
t.Error("Expected error, got nil")
}
}

client.admin, _ = newMockClusterAdminFailOps([]string{}, sarama.NewConfig())
if err := client.DeleteUserACLs("test-userr"); err == nil {
if err := client.DeleteUserACLs("test-userr", ""); err == nil {
t.Error("Expected error, got nil")
}
}

0 comments on commit 115f380

Please sign in to comment.