Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
wind57 committed Nov 3, 2023
2 parents 882f9c1 + 7fc7900 commit 6c9071e
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.cloud.kubernetes.fabric8.discovery;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
Expand All @@ -28,6 +27,9 @@
import org.springframework.cloud.kubernetes.commons.discovery.EndpointNameAndNamespace;
import org.springframework.cloud.kubernetes.commons.discovery.KubernetesDiscoveryProperties;

import static java.util.Comparator.comparing;
import static java.util.Comparator.nullsLast;

/**
* A simple holder for some instances needed for either Endpoints or EndpointSlice catalog
* implementations.
Expand All @@ -39,7 +41,7 @@ record Fabric8CatalogWatchContext(KubernetesClient kubernetesClient, KubernetesD

static List<EndpointNameAndNamespace> state(Stream<ObjectReference> references) {
return references.filter(Objects::nonNull).map(x -> new EndpointNameAndNamespace(x.getName(), x.getNamespace()))
.sorted(Comparator.comparing(EndpointNameAndNamespace::endpointName, String::compareTo)).toList();
.sorted(comparing(EndpointNameAndNamespace::endpointName, nullsLast(String::compareTo))).toList();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.kubernetes.fabric8.discovery;

import java.util.List;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import org.springframework.cloud.kubernetes.commons.discovery.EndpointNameAndNamespace;

/**
* @author wind57
*/
class Fabric8CatalogWatchContextTests {

@Test
void stateWithASingleElementNameNotNull() {

Stream<ObjectReference> referenceStream = Stream
.of(new ObjectReferenceBuilder().withName("a").withNamespace("default").build());

List<EndpointNameAndNamespace> result = Fabric8CatalogWatchContext.state(referenceStream);
Assertions.assertEquals(result.size(), 1);
Assertions.assertEquals(result.get(0).endpointName(), "a");
Assertions.assertEquals(result.get(0).namespace(), "default");

}

@Test
void stateWithASingleElementNameNull() {

Stream<ObjectReference> referenceStream = Stream
.of(new ObjectReferenceBuilder().withName(null).withNamespace("default").build());

List<EndpointNameAndNamespace> result = Fabric8CatalogWatchContext.state(referenceStream);
Assertions.assertEquals(result.size(), 1);
Assertions.assertNull(result.get(0).endpointName());
Assertions.assertEquals(result.get(0).namespace(), "default");

}

@Test
void stateWithTwoElementsNameNull() {

Stream<ObjectReference> referenceStream = Stream.of(
new ObjectReferenceBuilder().withName(null).withNamespace("defaultNull").build(),
new ObjectReferenceBuilder().withName("a").withNamespace("defaultA").build());

List<EndpointNameAndNamespace> result = Fabric8CatalogWatchContext.state(referenceStream);
Assertions.assertEquals(result.size(), 2);
Assertions.assertEquals(result.get(0).endpointName(), "a");
Assertions.assertEquals(result.get(0).namespace(), "defaultA");
Assertions.assertNull(result.get(1).endpointName());
Assertions.assertEquals(result.get(1).namespace(), "defaultNull");

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1Service;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.k3s.K3sContainer;
Expand Down Expand Up @@ -60,7 +61,7 @@ class ActuatorRefreshIT {

private static Util util;

// @BeforeAll
@BeforeAll
static void beforeAll() throws Exception {
K3S.start();
Commons.validateImage(SPRING_CLOUD_K8S_CONFIG_WATCHER_APP_NAME, K3S);
Expand All @@ -71,7 +72,7 @@ static void beforeAll() throws Exception {
configWatcher(Phase.CREATE);
}

// @AfterAll
@AfterAll
static void afterAll() throws Exception {
configWatcher(Phase.DELETE);
Commons.cleanUp(SPRING_CLOUD_K8S_CONFIG_WATCHER_APP_NAME, K3S);
Expand All @@ -97,7 +98,6 @@ void after() {
*/
// curl <WIREMOCK_POD_IP>:8080/__admin/mappings
@Test
@Disabled
void testActuatorRefresh() {

WireMock.configureFor(WIREMOCK_HOST, WIREMOCK_PORT, WIREMOCK_PATH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import io.kubernetes.client.openapi.models.V1Secret;
import io.kubernetes.client.openapi.models.V1SecretBuilder;
import io.kubernetes.client.openapi.models.V1Service;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.k3s.K3sContainer;

Expand Down Expand Up @@ -62,7 +63,7 @@ class ActuatorRefreshMultipleNamespacesIT {

private static Util util;

// @BeforeAll
@BeforeAll
static void beforeAll() throws Exception {
K3S.start();
Commons.validateImage(SPRING_CLOUD_K8S_CONFIG_WATCHER_APP_NAME, K3S);
Expand All @@ -75,7 +76,7 @@ static void beforeAll() throws Exception {
configWatcher(Phase.CREATE);
}

// @AfterAll
@AfterAll
static void afterAll() throws Exception {
configWatcher(Phase.DELETE);
util.wiremock(DEFAULT_NAMESPACE, "/", Phase.DELETE);
Expand All @@ -97,7 +98,6 @@ static void afterAll() throws Exception {
* </pre>
*/
@Test
@Disabled
void testConfigMapActuatorRefreshMultipleNamespaces() {
WireMock.configureFor(WIREMOCK_HOST, WIREMOCK_PORT, WIREMOCK_PATH);
await().timeout(Duration.ofSeconds(60))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ static void afterAll() throws Exception {

@BeforeEach
void setup() {
util.zookeeper(NAMESPACE, Phase.CREATE);
util.kafka(NAMESPACE, Phase.CREATE);
appA(Phase.CREATE);
appB(Phase.CREATE);
Expand All @@ -100,7 +99,6 @@ void setup() {

@AfterEach
void afterEach() {
util.zookeeper(NAMESPACE, Phase.DELETE);
util.kafka(NAMESPACE, Phase.DELETE);
appA(Phase.DELETE);
appB(Phase.DELETE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class LoadBalancerIT {

private static final String SERVICE_URL = "http://localhost:80/loadbalancer-it/service";

private static final String SPRING_CLOUD_K8S_LOADBALANCER_APP_NAME =
"spring-cloud-kubernetes-k8s-client-loadbalancer";
private static final String SPRING_CLOUD_K8S_LOADBALANCER_APP_NAME = "spring-cloud-kubernetes-k8s-client-loadbalancer";

private static final String NAMESPACE = "default";

Expand Down Expand Up @@ -155,8 +154,8 @@ private WebClient.Builder builder() {
}

private static void patchForServiceMode() {
patchWithMerge("spring-cloud-kubernetes-k8s-client-loadbalancer", LoadBalancerIT.NAMESPACE,
BODY_FOR_MERGE, POD_LABELS);
patchWithMerge("spring-cloud-kubernetes-k8s-client-loadbalancer", LoadBalancerIT.NAMESPACE, BODY_FOR_MERGE,
POD_LABELS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,14 @@ else if (phase.equals(Phase.DELETE)) {
public void kafka(String namespace, Phase phase) {
V1Deployment deployment = (V1Deployment) yaml("kafka/kafka-deployment.yaml");
V1Service service = (V1Service) yaml("kafka/kafka-service.yaml");
V1ConfigMap configMap = (V1ConfigMap) yaml("kafka/kafka-configmap-startup-script.yaml");

if (phase.equals(Phase.CREATE)) {
createAndWait(namespace, configMap, null);
createAndWait(namespace, "kafka", deployment, service, null, false);
}
else if (phase.equals(Phase.DELETE)) {
deleteAndWait(namespace, configMap, null);
deleteAndWait(namespace, deployment, service, null);
}
}
Expand All @@ -253,18 +256,6 @@ else if (phase.equals(Phase.DELETE)) {
}
}

public void zookeeper(String namespace, Phase phase) {
V1Deployment deployment = (V1Deployment) yaml("zookeeper/zookeeper-deployment.yaml");
V1Service service = (V1Service) yaml("zookeeper/zookeeper-service.yaml");

if (phase.equals(Phase.CREATE)) {
createAndWait(namespace, "zookeeper", deployment, service, null, false);
}
else if (phase.equals(Phase.DELETE)) {
deleteAndWait(namespace, deployment, service, null);
}
}

/**
* reads a yaml from classpath, fails if not found.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-start-config-script
data:
kafka_start.sh: |
#!/bin/sh
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
KAFKA_CLUSTER_ID="$(kafka-storage random-uuid)"
echo "kafka-storage format --ignore-formatted -t $KAFKA_CLUSTER_ID -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
/etc/confluent/docker/run
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,39 @@ spec:
# and this will cause this problem: https://github.com/confluentinc/cp-docker-images/blob/master/debian/kafka/include/etc/confluent/docker/configure#L58-L62
# Another solution is to rename the service.
enableServiceLinks: false
volumes:
- name: kafka-start-config-script
configMap:
name: kafka-start-config-script
defaultMode: 0744
containers:
- name: kafka
- name: kafka-start-config-script
volumeMounts:
- name: kafka-start-config-script
mountPath: /tmp
command:
- /tmp/./kafka_start.sh
image: confluentinc/cp-kafka:7.2.1
ports:
- containerPort: 9092
env:
- name: KAFKA_LISTENERS
value: "INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094"

- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT"

value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL://kafka:9092,OUTSIDE://localhost:9094"

- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"

- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP

- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper:2181

# we have enabled auto creation of topics and when this happens there is a replication factor of 3
# that is set automatically. Since we don't have that many, producers will fail.
# This setting ensures that there is just one replication
value: "PLAINTEXT://kafka:9092"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_PROCESS_ROLES
value: "broker,controller"
- name: KAFKA_NODE_ID
value: "1"
- name: KAFKA_LISTENERS
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "PLAINTEXT"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "1@localhost:9093"

This file was deleted.

This file was deleted.

0 comments on commit 6c9071e

Please sign in to comment.