diff --git a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CatalogWatchContext.java b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CatalogWatchContext.java index 9d3fc70f8b..9a9fadf8f1 100644 --- a/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CatalogWatchContext.java +++ b/spring-cloud-kubernetes-fabric8-discovery/src/main/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CatalogWatchContext.java @@ -16,7 +16,6 @@ package org.springframework.cloud.kubernetes.fabric8.discovery; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.stream.Stream; @@ -28,6 +27,9 @@ import org.springframework.cloud.kubernetes.commons.discovery.EndpointNameAndNamespace; import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties; +import static java.util.Comparator.comparing; +import static java.util.Comparator.nullsLast; + /** * A simple holder for some instances needed for either Endpoints or EndpointSlice catalog * implementations. @@ -39,7 +41,7 @@ record Fabric8CatalogWatchContext(KubernetesClient kubernetesClient, KubernetesD static List state(Stream references) { return references.filter(Objects::nonNull).map(x -> new EndpointNameAndNamespace(x.getName(), x.getNamespace())) - .sorted(Comparator.comparing(EndpointNameAndNamespace::endpointName, String::compareTo)).toList(); + .sorted(comparing(EndpointNameAndNamespace::endpointName, nullsLast(String::compareTo))).toList(); } } diff --git a/spring-cloud-kubernetes-fabric8-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CatalogWatchContextTests.java b/spring-cloud-kubernetes-fabric8-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CatalogWatchContextTests.java new file mode 100644 index 0000000000..3324baaba1 --- /dev/null +++ b/spring-cloud-kubernetes-fabric8-discovery/src/test/java/org/springframework/cloud/kubernetes/fabric8/discovery/Fabric8CatalogWatchContextTests.java @@ -0,0 +1,76 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.kubernetes.fabric8.discovery; + +import java.util.List; +import java.util.stream.Stream; + +import io.fabric8.kubernetes.api.model.ObjectReference; +import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.springframework.cloud.kubernetes.commons.discovery.EndpointNameAndNamespace; + +/** + * @author wind57 + */ +class Fabric8CatalogWatchContextTests { + + @Test + void stateWithASingleElementNameNotNull() { + + Stream referenceStream = Stream + .of(new ObjectReferenceBuilder().withName("a").withNamespace("default").build()); + + List result = Fabric8CatalogWatchContext.state(referenceStream); + Assertions.assertEquals(result.size(), 1); + Assertions.assertEquals(result.get(0).endpointName(), "a"); + Assertions.assertEquals(result.get(0).namespace(), "default"); + + } + + @Test + void stateWithASingleElementNameNull() { + + Stream referenceStream = Stream + .of(new ObjectReferenceBuilder().withName(null).withNamespace("default").build()); + + List result = Fabric8CatalogWatchContext.state(referenceStream); + Assertions.assertEquals(result.size(), 1); + Assertions.assertNull(result.get(0).endpointName()); + Assertions.assertEquals(result.get(0).namespace(), "default"); + + } + + @Test + void stateWithTwoElementsNameNull() { + + Stream referenceStream = Stream.of( + new ObjectReferenceBuilder().withName(null).withNamespace("defaultNull").build(), + new ObjectReferenceBuilder().withName("a").withNamespace("defaultA").build()); + + List result = Fabric8CatalogWatchContext.state(referenceStream); + Assertions.assertEquals(result.size(), 2); + Assertions.assertEquals(result.get(0).endpointName(), "a"); + Assertions.assertEquals(result.get(0).namespace(), "defaultA"); + Assertions.assertNull(result.get(1).endpointName()); + Assertions.assertEquals(result.get(1).namespace(), "defaultNull"); + + } + +} diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshIT.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshIT.java index a476fe027a..5eea8aa0a7 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshIT.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshIT.java @@ -24,10 +24,11 @@ import io.kubernetes.client.openapi.models.V1ConfigMapBuilder; import io.kubernetes.client.openapi.models.V1Deployment; import io.kubernetes.client.openapi.models.V1Service; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; import org.testcontainers.k3s.K3sContainer; @@ -60,7 +61,7 @@ class ActuatorRefreshIT { private static Util util; - // @BeforeAll + @BeforeAll static void beforeAll() throws Exception { K3S.start(); Commons.validateImage(SPRING_CLOUD_K8S_CONFIG_WATCHER_APP_NAME, K3S); @@ -71,7 +72,7 @@ static void beforeAll() throws Exception { configWatcher(Phase.CREATE); } - // @AfterAll + @AfterAll static void afterAll() throws Exception { configWatcher(Phase.DELETE); Commons.cleanUp(SPRING_CLOUD_K8S_CONFIG_WATCHER_APP_NAME, K3S); @@ -97,7 +98,6 @@ void after() { */ // curl :8080/__admin/mappings @Test - @Disabled void testActuatorRefresh() { WireMock.configureFor(WIREMOCK_HOST, WIREMOCK_PORT, WIREMOCK_PATH); diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshMultipleNamespacesIT.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshMultipleNamespacesIT.java index 16a38a6273..c8ae889a26 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshMultipleNamespacesIT.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-configuration-watcher/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/ActuatorRefreshMultipleNamespacesIT.java @@ -32,7 +32,8 @@ import io.kubernetes.client.openapi.models.V1Secret; import io.kubernetes.client.openapi.models.V1SecretBuilder; import io.kubernetes.client.openapi.models.V1Service; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.k3s.K3sContainer; @@ -62,7 +63,7 @@ class ActuatorRefreshMultipleNamespacesIT { private static Util util; - // @BeforeAll + @BeforeAll static void beforeAll() throws Exception { K3S.start(); Commons.validateImage(SPRING_CLOUD_K8S_CONFIG_WATCHER_APP_NAME, K3S); @@ -75,7 +76,7 @@ static void beforeAll() throws Exception { configWatcher(Phase.CREATE); } - // @AfterAll + @AfterAll static void afterAll() throws Exception { configWatcher(Phase.DELETE); util.wiremock(DEFAULT_NAMESPACE, "/", Phase.DELETE); @@ -97,7 +98,6 @@ static void afterAll() throws Exception { * */ @Test - @Disabled void testConfigMapActuatorRefreshMultipleNamespaces() { WireMock.configureFor(WIREMOCK_HOST, WIREMOCK_PORT, WIREMOCK_PATH); await().timeout(Duration.ofSeconds(60)) diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload-multiple-apps/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherMultipleAppsIT.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload-multiple-apps/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherMultipleAppsIT.java index 9655554f62..57c94a5b2b 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload-multiple-apps/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherMultipleAppsIT.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-kafka-configmap-reload-multiple-apps/kafka-configmap-test-app/src/test/java/org/springframework/cloud/kubernetes/configuration/watcher/multiple/apps/ConfigurationWatcherMultipleAppsIT.java @@ -91,7 +91,6 @@ static void afterAll() throws Exception { @BeforeEach void setup() { - util.zookeeper(NAMESPACE, Phase.CREATE); util.kafka(NAMESPACE, Phase.CREATE); appA(Phase.CREATE); appB(Phase.CREATE); @@ -100,7 +99,6 @@ void setup() { @AfterEach void afterEach() { - util.zookeeper(NAMESPACE, Phase.DELETE); util.kafka(NAMESPACE, Phase.DELETE); appA(Phase.DELETE); appB(Phase.DELETE); diff --git a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-loadbalancer/src/test/java/org/springframework/cloud/kubernetes/k8s/client/loadbalancer/LoadBalancerIT.java b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-loadbalancer/src/test/java/org/springframework/cloud/kubernetes/k8s/client/loadbalancer/LoadBalancerIT.java index e07081ad8a..85374aaaa1 100644 --- a/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-loadbalancer/src/test/java/org/springframework/cloud/kubernetes/k8s/client/loadbalancer/LoadBalancerIT.java +++ b/spring-cloud-kubernetes-integration-tests/spring-cloud-kubernetes-k8s-client-loadbalancer/src/test/java/org/springframework/cloud/kubernetes/k8s/client/loadbalancer/LoadBalancerIT.java @@ -76,8 +76,7 @@ class LoadBalancerIT { private static final String SERVICE_URL = "http://localhost:80/loadbalancer-it/service"; - private static final String SPRING_CLOUD_K8S_LOADBALANCER_APP_NAME = - "spring-cloud-kubernetes-k8s-client-loadbalancer"; + private static final String SPRING_CLOUD_K8S_LOADBALANCER_APP_NAME = "spring-cloud-kubernetes-k8s-client-loadbalancer"; private static final String NAMESPACE = "default"; @@ -155,8 +154,8 @@ private WebClient.Builder builder() { } private static void patchForServiceMode() { - patchWithMerge("spring-cloud-kubernetes-k8s-client-loadbalancer", LoadBalancerIT.NAMESPACE, - BODY_FOR_MERGE, POD_LABELS); + patchWithMerge("spring-cloud-kubernetes-k8s-client-loadbalancer", LoadBalancerIT.NAMESPACE, BODY_FOR_MERGE, + POD_LABELS); } } diff --git a/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/native_client/Util.java b/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/native_client/Util.java index b55751800f..55e39c7037 100644 --- a/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/native_client/Util.java +++ b/spring-cloud-kubernetes-test-support/src/main/java/org/springframework/cloud/kubernetes/integration/tests/commons/native_client/Util.java @@ -232,11 +232,14 @@ else if (phase.equals(Phase.DELETE)) { public void kafka(String namespace, Phase phase) { V1Deployment deployment = (V1Deployment) yaml("kafka/kafka-deployment.yaml"); V1Service service = (V1Service) yaml("kafka/kafka-service.yaml"); + V1ConfigMap configMap = (V1ConfigMap) yaml("kafka/kafka-configmap-startup-script.yaml"); if (phase.equals(Phase.CREATE)) { + createAndWait(namespace, configMap, null); createAndWait(namespace, "kafka", deployment, service, null, false); } else if (phase.equals(Phase.DELETE)) { + deleteAndWait(namespace, configMap, null); deleteAndWait(namespace, deployment, service, null); } } @@ -253,18 +256,6 @@ else if (phase.equals(Phase.DELETE)) { } } - public void zookeeper(String namespace, Phase phase) { - V1Deployment deployment = (V1Deployment) yaml("zookeeper/zookeeper-deployment.yaml"); - V1Service service = (V1Service) yaml("zookeeper/zookeeper-service.yaml"); - - if (phase.equals(Phase.CREATE)) { - createAndWait(namespace, "zookeeper", deployment, service, null, false); - } - else if (phase.equals(Phase.DELETE)) { - deleteAndWait(namespace, deployment, service, null); - } - } - /** * reads a yaml from classpath, fails if not found. */ diff --git a/spring-cloud-kubernetes-test-support/src/main/resources/kafka/kafka-configmap-startup-script.yaml b/spring-cloud-kubernetes-test-support/src/main/resources/kafka/kafka-configmap-startup-script.yaml new file mode 100644 index 0000000000..de6ffe2139 --- /dev/null +++ b/spring-cloud-kubernetes-test-support/src/main/resources/kafka/kafka-configmap-startup-script.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: kafka-start-config-script +data: + kafka_start.sh: | + #!/bin/sh + sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure + sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure + KAFKA_CLUSTER_ID="$(kafka-storage random-uuid)" + echo "kafka-storage format --ignore-formatted -t $KAFKA_CLUSTER_ID -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure + /etc/confluent/docker/run diff --git a/spring-cloud-kubernetes-test-support/src/main/resources/kafka/kafka-deployment.yaml b/spring-cloud-kubernetes-test-support/src/main/resources/kafka/kafka-deployment.yaml index 6cce9e9181..cbe261d3d7 100644 --- a/spring-cloud-kubernetes-test-support/src/main/resources/kafka/kafka-deployment.yaml +++ b/spring-cloud-kubernetes-test-support/src/main/resources/kafka/kafka-deployment.yaml @@ -21,34 +21,39 @@ spec: # and this will cause this problem: https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/configure#L58-L62 # Another solution is to rename the service. enableServiceLinks: false + volumes: + - name: kafka-start-config-script + configMap: + name: kafka-start-config-script + defaultMode: 0744 containers: - - name: kafka + - name: kafka-start-config-script + volumeMounts: + - name: kafka-start-config-script + mountPath: /tmp + command: + - /tmp/./kafka_start.sh image: confluentinc/cp-kafka:7.2.1 ports: - containerPort: 9092 env: - - name: KAFKA_LISTENERS - value: "INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094" - + - name: KAFKA_BROKER_ID + value: "1" - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP - value: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT" - + value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" - name: KAFKA_ADVERTISED_LISTENERS - value: "INTERNAL://kafka:9092,OUTSIDE://localhost:9094" - - - name: KAFKA_INTER_BROKER_LISTENER_NAME - value: "INTERNAL" - - - name: KAFKA_ADVERTISED_HOST_NAME - valueFrom: - fieldRef: - fieldPath: status.podIP - - - name: KAFKA_ZOOKEEPER_CONNECT - value: zookeeper:2181 - - # we have enabled auto creation of topics and when this happens there is a replication factor of 3 - # that is set automatically. Since we don't have that many, producers will fail. - # This setting ensures that there is just one replication + value: "PLAINTEXT://kafka:9092" - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR value: "1" + - name: KAFKA_PROCESS_ROLES + value: "broker,controller" + - name: KAFKA_NODE_ID + value: "1" + - name: KAFKA_LISTENERS + value: "PLAINTEXT://:9092,CONTROLLER://:9093" + - name: KAFKA_INTER_BROKER_LISTENER_NAME + value: "PLAINTEXT" + - name: KAFKA_CONTROLLER_LISTENER_NAMES + value: "CONTROLLER" + - name: KAFKA_CONTROLLER_QUORUM_VOTERS + value: "1@localhost:9093" diff --git a/spring-cloud-kubernetes-test-support/src/main/resources/zookeeper/zookeeper-deployment.yaml b/spring-cloud-kubernetes-test-support/src/main/resources/zookeeper/zookeeper-deployment.yaml deleted file mode 100644 index 3c285c30f4..0000000000 --- a/spring-cloud-kubernetes-test-support/src/main/resources/zookeeper/zookeeper-deployment.yaml +++ /dev/null @@ -1,31 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - labels: - app: kafka - component: zookeeper - name: zookeeper -spec: - replicas: 1 - selector: - matchLabels: - app: kafka - component: zookeeper - template: - metadata: - labels: - app: kafka - component: zookeeper - spec: - containers: - - name: zookeeper - image: confluentinc/cp-zookeeper:7.2.1 - ports: - - containerPort: 2181 - env: - - name: ZOOKEEPER_ID - value: "1" - - name: ZOOKEEPER_SERVER_1 - value: zookeeper - - name: ZOOKEEPER_CLIENT_PORT - value: 2181 diff --git a/spring-cloud-kubernetes-test-support/src/main/resources/zookeeper/zookeeper-service.yaml b/spring-cloud-kubernetes-test-support/src/main/resources/zookeeper/zookeeper-service.yaml deleted file mode 100644 index eb025d3c4a..0000000000 --- a/spring-cloud-kubernetes-test-support/src/main/resources/zookeeper/zookeeper-service.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: zookeeper - labels: - app: kafka - component: zookeeper -spec: - ports: - - port: 2181 - name: zookeeper-port - targetPort: 2181 - protocol: TCP - selector: - app: kafka - component: zookeeper