diff --git a/tests/e2e/go.mod b/tests/e2e/go.mod index 2d1b5df6ac..802aa0152f 100644 --- a/tests/e2e/go.mod +++ b/tests/e2e/go.mod @@ -14,6 +14,7 @@ require ( github.com/twmb/franz-go v1.13.5 k8s.io/apiextensions-apiserver v0.26.4 k8s.io/apimachinery v0.26.4 + k8s.io/utils v0.0.0-20230726121419-3b25d923346b sigs.k8s.io/yaml v1.3.0 ) @@ -125,7 +126,6 @@ require ( k8s.io/client-go v0.26.4 // indirect k8s.io/klog/v2 v2.80.1 // indirect k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715 // indirect - k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) diff --git a/tests/e2e/go.sum b/tests/e2e/go.sum index c69c724d96..98c2ace1f8 100644 --- a/tests/e2e/go.sum +++ b/tests/e2e/go.sum @@ -845,8 +845,8 @@ k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715 h1:tBEbstoM+K0FiBV5KGAKQ0kuvf54v/hwpldiJt69w1s= k8s.io/kube-openapi v0.0.0-20221207184640-f3cff1453715/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 h1:KTgPnR10d5zhztWptI952TNtt/4u5h3IzDXkdIMuo2Y= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/tests/e2e/k8s.go b/tests/e2e/k8s.go index f6b7a3761d..c1bd7b7401 100644 --- a/tests/e2e/k8s.go +++ b/tests/e2e/k8s.go @@ -81,7 +81,7 @@ func applyK8sResourceManifestFromString(kubectlOptions k8s.KubectlOptions, manif // applyK8sResourceFromTemplate generates manifest from the specified go-template based on values // and applies the specified manifest to the provided kubectl context and namespace. -func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]interface{}, extraArgs ...string) error { +func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFile string, values map[string]any, extraArgs ...string) error { By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) var manifest bytes.Buffer rawTemplate, err := os.ReadFile(templateFile) @@ -96,6 +96,24 @@ func applyK8sResourceFromTemplate(kubectlOptions k8s.KubectlOptions, templateFil return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) } +// applyK8sResourceFromTemplate generates manifest from the specified go-template based on values +// and applies the specified manifest to the provided kubectl context and namespace. +func applyK8sResourceFromTemplate_2(kubectlOptions k8s.KubectlOptions, templateFile string, values any, extraArgs ...string) error { + By(fmt.Sprintf("Generating k8s manifest from template %s", templateFile)) + var manifest bytes.Buffer + rawTemplate, err := os.ReadFile(templateFile) + if err != nil { + return err + } + t := template.Must(template.New("template").Funcs(sprig.TxtFuncMap()).Parse(string(rawTemplate))) + err = t.Execute(&manifest, values) + if err != nil { + return err + } + fmt.Printf("###\nManifest is :\n%s\n###\n", manifest.String()) + return applyK8sResourceManifestFromString(kubectlOptions, manifest.String(), extraArgs...) +} + // isExistingK8SResource queries a Resource by it's kind, namespace and name and // returns true if it's found, false otherwise func isExistingK8SResource( diff --git a/tests/e2e/kafka.go b/tests/e2e/kafka.go index 54b7cfd9b8..316c95fcc7 100644 --- a/tests/e2e/kafka.go +++ b/tests/e2e/kafka.go @@ -21,6 +21,7 @@ import ( "github.com/gruntwork-io/terratest/modules/k8s" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "k8s.io/utils/ptr" ) // requireDeleteKafkaTopic deletes kafkaTopic resource. @@ -34,14 +35,29 @@ func requireDeleteKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string // requireDeployingKafkaTopic deploys a kafkaTopic resource from a template func requireDeployingKafkaTopic(kubectlOptions k8s.KubectlOptions, topicName string) { It("Deploying KafkaTopic CR", func() { - err := applyK8sResourceFromTemplate(kubectlOptions, - kafkaTopicTemplate, - map[string]interface{}{ - "Name": topicName, - "TopicName": topicName, - "Namespace": kubectlOptions.Namespace, + // err := applyK8sResourceFromTemplate(kubectlOptions, + // kafkaTopicTemplate, + // map[string]interface{}{ + // "Name": topicName, + // "TopicName": topicName, + // "Namespace": kubectlOptions.Namespace, + // }, + // ) + values := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaClusterName, + Namespace: kubectlOptions.Namespace, }, - ) + Name: topicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: topicName, + } + + err := applyK8sResourceFromTemplate_2(kubectlOptions, kafkaTopicTemplate, values) + Expect(err).ShouldNot(HaveOccurred()) err = waitK8sResourceCondition(kubectlOptions, kafkaTopicKind, @@ -82,3 +98,23 @@ func requireDeployingKafkaUser(kubectlOptions k8s.KubectlOptions, userName strin }, defaultUserCreationWaitTime, 3*time.Second).Should(Equal(true)) }) } + +// kafkaTopicTemplateData is a struct that holds the relevant information and structure +// to fill out the template used to generate KafkaTopics. +// TODO: long term we should use the structs in the api module instead of these local structs. +type kafkaTopicTemplateData struct { + Annotations []string + ClusterRef kafkaTopicClusterRef + Name string + Namespace string + Partitions *int32 + ReplicationFactor *int32 + TopicName string +} + +// kafkaTopicClusterRef holds the information relevant to identifying a KafkaCluster within a KafkaTopic CR. +// TODO: Long term, we should use the structs in the api module instead of these local structs. +type kafkaTopicClusterRef struct { + Name string + Namespace string +} diff --git a/tests/e2e/kafkatopic_webhook.go b/tests/e2e/kafkatopic_webhook.go index 69f68933f9..be44c3e124 100644 --- a/tests/e2e/kafkatopic_webhook.go +++ b/tests/e2e/kafkatopic_webhook.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" ) func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { @@ -35,9 +36,339 @@ func testWebhookKafkaTopic(kafkaCluster types.NamespacedName) { kubectlOptions.Namespace = kafkaCluster.Namespace - testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) - testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) + //testWebhookCreateKafkaTopic(kubectlOptions, kafkaCluster) + //testWebhookUpdateKafkaTopic(kubectlOptions, kafkaCluster) + testWebhookCreateKafkaTopic_2(kubectlOptions, kafkaCluster) + testWebhookUpdateKafkaTopic_2(kubectlOptions, kafkaCluster) +} + +func testWebhookCreateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { + return When("Testing KafkaTopic Create", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) + }) + + const nonExistent string = "non-existent" + + It("Test non-existent KafkaCluster", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: nonExistent, // Note: This is a deliberately inserted error for this test case. + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData.Name, caseData.ClusterRef.Name, caseData.ClusterRef.Namespace), + ) + }) + + It("Test 0 partitions and replicationFactor", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.partitions: Invalid value: %d: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", ptr.Deref(caseData.Partitions, -100)), + ContainSubstring("spec.replicationFactor: Invalid value: %d: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", ptr.Deref(caseData.ReplicationFactor, -100)), + )) + }) + + // In the current validation webhook implementation, this case can only be encountered on a Create operation + It("Test ReplicationFactor larger than number of brokers", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(10)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.replicationFactor: Invalid value: 10: replication factor is larger than the number of nodes in the kafka cluster (available brokers: 3) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.replicationFactor: Invalid value: %[2]d: replication factor is larger than the number of nodes in the kafka cluster", + caseData.Name, ptr.Deref(caseData.ReplicationFactor, -100)), + ) + }) + // Test case involving existing CRs but not necessarily an Update operation + When("Testing conflicts similar CRs", Ordered, func() { + var overlappingTopicName string = testInternalTopicName + requireDeployingKafkaTopic(kubectlOptions, overlappingTopicName) + + It("Testing conflict on spec.name", func() { + var caseData kafkaTopicTemplateData + + By("With managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, // Note: This information is relevant to this particular test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData.Name, overlappingTopicName, caseData.Namespace), + ) + + By("Without managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: nil, // Note: This is a deliberately inserted error for this test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err = applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'", + overlappingTopicName, caseData.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + overlappingTopicName), + )) + }) + requireDeleteKafkaTopic(kubectlOptions, overlappingTopicName) + }) + }) +} + +func testWebhookUpdateKafkaTopic_2(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { + return When("Testing KafkaTopic Update", func() { + BeforeAll(func() { + Expect(isExistingK8SResource(kubectlOptions, kafkaKind, kafkaCluster.Name)).To(BeTrue()) + }) + + const nonExistent string = "non-existent" + + // Update operation implies having a CR with the same name in place + requireDeployingKafkaTopic(kubectlOptions, testInternalTopicName) + + It("Test non-existent KafkaCluster", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: nonExistent, // Note: This is a deliberately inserted error for this test case. + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", + caseData.Name, caseData.ClusterRef.Name, caseData.Namespace), + ) + }) + + // A successfully created KafkaTopic CR cannot have 0 for either Partition or ReplicationFactor. + // At the same time, during an Update, a KafkaTopic cannot have its: + // * spec.partitions decreased + // * spec.replicationFactor changed (not just decreased) + // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. + It("Test 0 values partitions and replicationFactor", func() { + caseData := kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: testInternalTopicName, + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + ReplicationFactor: ptr.To(int32(0)), // Note: This is a deliberately inserted error for this test case. + TopicName: testInternalTopicName, + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: + // * spec.partitions: Invalid value: 0: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0 (or set it to be -1 to use the broker's default) + // * spec.partitions: Invalid value: 0: kafka does not support decreasing partition count on an existing topic (from 2 to 0) + // * spec.replicationFactor: Invalid value: 0: kafka does not support changing the replication factor on an existing topic (from 2 to 0) + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), + ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), + ContainSubstring("spec.partitions: Invalid value: %d: kafka does not support decreasing partition count on an existing topic", ptr.Deref(caseData.Partitions, -100)), + ContainSubstring("spec.replicationFactor: Invalid value: %d: kafka does not support changing the replication factor on an existing topic", ptr.Deref(caseData.ReplicationFactor, -100)), + )) + }) + + It("Testing conflict on spec.name", func() { + var overlappingTopicName string = testInternalTopicName + var caseData kafkaTopicTemplateData + + By("With managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: []string{"managedBy: koperator"}, // Note: This information is relevant to this particular test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err := applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) + Expect(err.Error()).To( + ContainSubstring("The KafkaTopic %[1]q is invalid: spec.name: Invalid value: %[2]q: kafkaTopic CR '%[2]s' in namesapce '%[3]s' is already referencing to Kafka topic '%[2]s'", + caseData.Name, overlappingTopicName, caseData.Namespace), + ) + + By("Without managedBy koperator annotation") + caseData = kafkaTopicTemplateData{ + Annotations: nil, // Note: This is a deliberately inserted error for this test case. + ClusterRef: kafkaTopicClusterRef{ + Name: kafkaCluster.Name, + Namespace: kafkaCluster.Namespace, + }, + Name: overlappingTopicName + "different-cr-name", // Note: This information is relevant to this particular test case. + Namespace: kubectlOptions.Namespace, + Partitions: ptr.To(int32(2)), + ReplicationFactor: ptr.To(int32(2)), + TopicName: overlappingTopicName, // Note: This is a deliberately inserted error for this test case. + } + err = applyK8sResourceFromTemplate_2( + kubectlOptions, + kafkaTopicTemplate, + caseData, + dryRunStrategyArgServer, + ) + Expect(err).To(HaveOccurred()) + // Example error: + // error while running command: exit status 1; The KafkaTopic "topic-test-internal-different-cr-name" is invalid: + // * spec.name: Invalid value: "topic-test-internal": kafkaTopic CR 'topic-test-internal' in namesapce 'kafka' is already referencing to Kafka topic 'topic-test-internal' + // * spec.name: Invalid value: "topic-test-internal": topic "topic-test-internal" already exists on kafka cluster and it is not managed by Koperator, + // if you want it to be managed by Koperator so you can modify its configurations through a KafkaTopic CR, + // add this "managedBy: koperator" annotation to this KafkaTopic CR + Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(5)) + Expect(err.Error()).To(And( + ContainSubstring("The KafkaTopic %q is invalid:", caseData.Name), + ContainSubstring("spec.name: Invalid value: %[1]q: kafkaTopic CR '%[1]s' in namesapce '%[2]s' is already referencing to Kafka topic '%[1]s'\n", + overlappingTopicName, caseData.Namespace), + ContainSubstring("spec.name: Invalid value: %[1]q: topic %[1]q already exists on kafka cluster and it is not managed by Koperator", + overlappingTopicName), + )) + }) + + // Clean up the KafkaTopic set up to test Update operations against + requireDeleteKafkaTopic(kubectlOptions, testInternalTopicName) + }) } func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster types.NamespacedName) bool { @@ -67,17 +398,17 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster ) Expect(err).To(HaveOccurred()) // Example error: - // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "NON-EXISTENT": kafkaCluster 'NON-EXISTENT' in the namespace 'kafka' does not exist + // error while running command: exit status 1; The KafkaTopic "topic-test-internal" is invalid: spec.clusterRef.name: Invalid value: "non-existent": kafkaCluster 'non-existent' in the namespace 'kafka' does not exist Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(1)) - Expect(err.Error()).To(And( + Expect(err.Error()).To( ContainSubstring("The KafkaTopic %[1]q is invalid: spec.clusterRef.name: Invalid value: %[2]q: kafkaCluster '%[2]s' in the namespace '%[3]s' does not exist", caseData["Name"], nonExistent, kubectlOptions.Namespace), - )) + ) }) It("Test 0 partitions and replicationFactor", func() { caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["Partition"] = "0" + caseData["Partitions"] = "0" caseData["ReplicationFactor"] = "0" err := applyK8sResourceFromTemplate( kubectlOptions, @@ -93,7 +424,7 @@ func testWebhookCreateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster Expect(len(strings.Split(err.Error(), "\n"))).To(Equal(3)) Expect(err.Error()).To(And( ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), - ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["Partition"]), + ContainSubstring("spec.partitions: Invalid value: %s: number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["Partitions"]), ContainSubstring("spec.replicationFactor: Invalid value: %s: replication factor must be larger than 0 (or set it to be -1 to use the broker's default)", caseData["ReplicationFactor"]), )) }) @@ -202,6 +533,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster "Name": nonExistent, "Namespace": kafkaCluster.Namespace, } + err := applyK8sResourceFromTemplate( kubectlOptions, kafkaTopicTemplate, @@ -225,7 +557,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster // Consequently, an Update test for 0 values will automatically also cover the decreasing/changing scenarios. It("Test 0 values partitions and replicationFactor", func() { caseData := copyMapWithStringKeys(baseKafkaTopicTemplateValues) - caseData["Partition"] = "0" + caseData["Partitions"] = "0" caseData["ReplicationFactor"] = "0" err := applyK8sResourceFromTemplate( kubectlOptions, @@ -245,7 +577,7 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster ContainSubstring("The KafkaTopic %q is invalid:", caseData["Name"]), ContainSubstring("spec.partitions: Invalid value: 0: number of partitions must be larger than 0"), ContainSubstring("spec.replicationFactor: Invalid value: 0: replication factor must be larger than 0"), - ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", caseData["Partition"]), + ContainSubstring("spec.partitions: Invalid value: %s: kafka does not support decreasing partition count on an existing topic", caseData["Partitions"]), ContainSubstring("spec.replicationFactor: Invalid value: %s: kafka does not support changing the replication factor on an existing topic", caseData["ReplicationFactor"]), )) }) @@ -309,12 +641,12 @@ func testWebhookUpdateKafkaTopic(kubectlOptions k8s.KubectlOptions, kafkaCluster }) } -func baseKafkaTopicData(kafkaTopic types.NamespacedName, kafkaCluster types.NamespacedName) map[string]interface{} { +func baseKafkaTopicData(kafkaTopic types.NamespacedName, kafkaCluster types.NamespacedName) map[string]interface{} { // string topic name first return map[string]interface{}{ "Name": kafkaTopic.Name, "TopicName": kafkaTopic.Name, "Namespace": kafkaTopic.Namespace, - "Partition": "2", + "Partitions": "2", "ReplicationFactor": "2", "ClusterRef": map[string]string{ "Name": kafkaCluster.Name, diff --git a/tests/e2e/koperator_suite_test.go b/tests/e2e/koperator_suite_test.go index 72051b0486..57a9b15c70 100644 --- a/tests/e2e/koperator_suite_test.go +++ b/tests/e2e/koperator_suite_test.go @@ -55,18 +55,18 @@ var _ = BeforeSuite(func() { }) var _ = When("Testing e2e test altogether", Ordered, func() { - var snapshottedInfo = &clusterSnapshot{} - snapshotCluster(snapshottedInfo) - testInstall() - testInstallZookeeperCluster() - testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") + // var snapshottedInfo = &clusterSnapshot{} + // snapshotCluster(snapshottedInfo) + // testInstall() + // testInstallZookeeperCluster() + // testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml") testWebhookKafkaTopic(types.NamespacedName{Name: kafkaClusterName, Namespace: koperatorLocalHelmDescriptor.Namespace}) - testProduceConsumeInternal() - testUninstallKafkaCluster() - testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") - testProduceConsumeInternalSSL(defaultTLSSecretName) - testUninstallKafkaCluster() - testUninstallZookeeperCluster() - testUninstall() - snapshotClusterAndCompare(snapshottedInfo) + // testProduceConsumeInternal() + // testUninstallKafkaCluster() + // testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml") + // testProduceConsumeInternalSSL(defaultTLSSecretName) + // testUninstallKafkaCluster() + // testUninstallZookeeperCluster() + // testUninstall() + // snapshotClusterAndCompare(snapshottedInfo) }) diff --git a/tests/e2e/templates/topic.yaml.tmpl b/tests/e2e/templates/topic.yaml.tmpl index 3f2680bf80..50dd2035dc 100644 --- a/tests/e2e/templates/topic.yaml.tmpl +++ b/tests/e2e/templates/topic.yaml.tmpl @@ -14,7 +14,7 @@ spec: name: {{or .ClusterRef.Name "kafka"}} namespace: {{or .ClusterRef.Namespace "kafka"}} name: {{ .TopicName }} - partitions: {{or .Partition 2}} + partitions: {{or .Partitions 2}} replicationFactor: {{or .ReplicationFactor 2}} config: "retention.ms": "604800000"