diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index c91df044c..f8fbbad67 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -57,7 +57,7 @@ type brokerReconcilePriority int const ( componentName = "kafka" brokerConfigTemplate = "%s-config" - brokerStorageTemplate = "%s-%d-storage-%d-" + brokerStorageTemplate = "%s-%d-storage-%d" brokerConfigMapVolumeMount = "broker-config" kafkaDataVolumeMount = "kafka-data" diff --git a/pkg/resources/kafka/pvc.go b/pkg/resources/kafka/pvc.go index eb41184a0..44669bb9c 100644 --- a/pkg/resources/kafka/pvc.go +++ b/pkg/resources/kafka/pvc.go @@ -29,7 +29,7 @@ import ( func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.StorageConfig, _ logr.Logger) runtime.Object { return &corev1.PersistentVolumeClaim{ - ObjectMeta: templates.ObjectMetaWithGeneratedNameAndAnnotations( + ObjectMeta: templates.ObjectMetaWithNameAndAnnotations( fmt.Sprintf(brokerStorageTemplate, r.KafkaCluster.Name, brokerId, storageIndex), util.MergeLabels( kafka.LabelsForKafka(r.KafkaCluster.Name), diff --git a/pkg/resources/templates/templates.go b/pkg/resources/templates/templates.go index 9324da56d..7285e48d7 100644 --- a/pkg/resources/templates/templates.go +++ b/pkg/resources/templates/templates.go @@ -87,6 +87,25 @@ func ObjectMetaWithGeneratedName(namePrefix string, labels map[string]string, cl } } +// ObjectMetaWithName returns a metav1.ObjectMeta object with labels, ownerReference and name +func ObjectMetaWithName(name string, labels map[string]string, cluster *v1beta1.KafkaCluster) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: name, + Namespace: cluster.Namespace, + Labels: ObjectMetaLabels(cluster, labels), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: cluster.APIVersion, + Kind: cluster.Kind, + Name: cluster.Name, + UID: cluster.UID, + Controller: util.BoolPointer(true), + BlockOwnerDeletion: util.BoolPointer(true), + }, + }, + } +} + func ObjectMetaLabels(cluster *v1beta1.KafkaCluster, l map[string]string) map[string]string { if cluster.Spec.PropagateLabels { return util.MergeLabels(cluster.Labels, l) @@ -108,6 +127,13 @@ func ObjectMetaWithGeneratedNameAndAnnotations(namePrefix string, labels map[str return o } +// ObjectMetaWithNameAndAnnotations returns a metav1.ObjectMeta object with labels, ownerReference, name and annotations +func ObjectMetaWithNameAndAnnotations(name string, labels map[string]string, annotations map[string]string, cluster *v1beta1.KafkaCluster) metav1.ObjectMeta { + o := ObjectMetaWithName(name, labels, cluster) + o.Annotations = annotations + return o +} + // ObjectMetaClusterScope returns a metav1.ObjectMeta object with labels, ownerReference, name and annotations func ObjectMetaClusterScope(name string, labels map[string]string, cluster *v1beta1.KafkaCluster) metav1.ObjectMeta { return metav1.ObjectMeta{