Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean KafkaBrokerConfigurationBuilder class after ZooKeeper removal #10998

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public KafkaBrokerConfigurationBuilder(Reconciliation reconciliation, NodeRef no
}

/**
* Renders the broker.id or node.id configurations
* Renders the node ID configurations
*/
private void configureNodeOrBrokerId() {
printSectionHeader("Node / Broker ID");
printSectionHeader("Node ID");
writer.println("node.id=" + node.nodeId());
writer.println();
}
Expand Down Expand Up @@ -204,7 +204,6 @@ public KafkaBrokerConfigurationBuilder withKRaft(String clusterName, String name
* This is used to configure the user-configurable listeners.
* @return Returns the builder instance
*/
@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
public KafkaBrokerConfigurationBuilder withListeners(
String clusterName,
KafkaVersion kafkaVersion,
Expand All @@ -217,9 +216,19 @@ public KafkaBrokerConfigurationBuilder withListeners(
List<String> advertisedListeners = new ArrayList<>();
List<String> securityProtocol = new ArrayList<>();

boolean isKraftControllerOnly = node.controller() && !node.broker();
////////////////////
// Listeners that are on all nodes
////////////////////

// Control plane listener is configured for all nodes. Even brokers need to connect and talk to controllers, so
// they need to know what is the security protocol and security configuration
securityProtocol.add(CONTROL_PLANE_LISTENER_NAME + ":SSL");
configureControlPlaneListener();

////////////////////
// Listeners for nodes with controller role
////////////////////

// Control Plane listener is set for pure KRaft controller or combined node
if (node.controller()) {
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");

Expand All @@ -233,33 +242,22 @@ public KafkaBrokerConfigurationBuilder withListeners(
}
}

// Security protocol and Control Plane Listener are configured everywhere

// Brokers need to know how to connect to the controllers on the Control Plane listener and what security (encryption/authentication) they should use.
// For that reason, we have to configure the Control Plane listener in the broker-only configuration as well,
// even though they do not listen at the Control Plane listener port.
// The brokers use this configuration to detect how to connect to the controllers, what certificates to use etc.
securityProtocol.add(CONTROL_PLANE_LISTENER_NAME + ":SSL");
// Control Plane listener is configured on KRaft broker only nodes as well for allowing TLS certificates keystore generation
// so that brokers are able to connect to controllers as TLS clients
configureControlPlaneListener();
////////////////////
// Listeners for nodes with broker role
////////////////////

// Replication Listener to be configured on brokers
if (node.broker()) {
// Replication Listener to be configured only on brokers
securityProtocol.add(REPLICATION_LISTENER_NAME + ":SSL");
configureReplicationListener();
}

// Non-controller listeners are used only on brokers (including mixed nodes)
if (!isKraftControllerOnly) {
// Replication listener
listeners.add(REPLICATION_LISTENER_NAME + "://0.0.0.0:9091");
advertisedListeners.add(String.format("%s://%s:9091",
REPLICATION_LISTENER_NAME,
// Pod name constructed to be templatable for each individual ordinal
DnsNameGenerator.podDnsNameWithoutClusterDomain(namespace, KafkaResources.brokersServiceName(clusterName), node.podName())
));
configureReplicationListener();

// User-configured listeners
for (GenericKafkaListener listener : kafkaListeners) {
int port = listener.getPort();
String listenerName = ListenersUtils.identifier(listener).toUpperCase(Locale.ENGLISH);
Expand All @@ -285,20 +283,25 @@ public KafkaBrokerConfigurationBuilder withListeners(
}
}

////////////////////
// Shared configurations with values dependent on all listeners
////////////////////

// configure OAuth principal builder for all the nodes - brokers, controllers, and mixed
configureOAuthPrincipalBuilderIfNeeded(writer, kafkaListeners);

printSectionHeader("Common listener configuration");
writer.println("listener.security.protocol.map=" + String.join(",", securityProtocol));
writer.println("listeners=" + String.join(",", listeners));

if (!isKraftControllerOnly) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
if (node.broker()) {
// Inter-broker listener is configured only for nodes with broker role
writer.println("inter.broker.listener.name=" + REPLICATION_LISTENER_NAME);
} else if (node.controller()) {
if (advertisedListeners.size() > 0) {
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
}
}

if (!advertisedListeners.isEmpty()) {
// Advertised listeners might be empty for controller-only nodes with Kafka versions older than 3.9.0
writer.println("advertised.listeners=" + String.join(",", advertisedListeners));
}

writer.println("sasl.enabled.mechanisms=");
Expand All @@ -323,21 +326,17 @@ private void configureOAuthPrincipalBuilderIfNeeded(PrintWriter writer, List<Gen
* rather static, it always uses TLS with TLS client auth.
*/
private void configureControlPlaneListener() {
final String controlPlaneListenerName = CONTROL_PLANE_LISTENER_NAME.toLowerCase(Locale.ENGLISH);

printSectionHeader("Control Plane listener");
configureListener(controlPlaneListenerName);
configureListener(CONTROL_PLANE_LISTENER_NAME.toLowerCase(Locale.ENGLISH));
}

/**
* Internal method which configures the replication listener. The replication listener configuration is currently
* rather static, it always uses TLS with TLS client auth.
*/
private void configureReplicationListener() {
final String replicationListenerName = REPLICATION_LISTENER_NAME.toLowerCase(Locale.ENGLISH);

printSectionHeader("Replication listener");
configureListener(replicationListenerName);
configureListener(REPLICATION_LISTENER_NAME.toLowerCase(Locale.ENGLISH));
}

/**
Expand Down Expand Up @@ -365,7 +364,7 @@ private void configureListener(String listenerName) {
*/
private void configureListener(String listenerName, GenericKafkaListenerConfiguration configuration) {
if (configuration != null) {
String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
final String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);

if (configuration.getMaxConnections() != null) {
writer.println(String.format("listener.name.%s.max.connections=%d", listenerNameInProperty, configuration.getMaxConnections()));
Expand All @@ -384,7 +383,7 @@ private void configureListener(String listenerName, GenericKafkaListenerConfigur
* @param serverCertificate The custom certificate configuration (null if not specified by the user in the Kafka CR)
*/
private void configureTls(String listenerName, CertAndKeySecretSource serverCertificate) {
String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
final String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);

if (serverCertificate != null) {
writer.println(String.format("listener.name.%s.ssl.keystore.location=/tmp/kafka/custom-%s.keystore.p12", listenerNameInProperty, listenerNameInProperty));
Expand All @@ -408,8 +407,8 @@ private void configureTls(String listenerName, CertAndKeySecretSource serverCert
* @param auth The authentication configuration from the Kafka CR
*/
private void configureAuthentication(String listenerName, List<String> securityProtocol, boolean tls, KafkaListenerAuthentication auth) {
String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
String listenerNameInEnvVar = listenerName.replace("-", "_");
final String listenerNameInProperty = listenerName.toLowerCase(Locale.ENGLISH);
final String listenerNameInEnvVar = listenerName.replace("-", "_");

if (auth instanceof KafkaListenerAuthenticationOAuth oauth) {
securityProtocol.add(String.format("%s:%s", listenerName, getSecurityProtocol(tls, true)));
Expand All @@ -421,7 +420,7 @@ private void configureAuthentication(String listenerName, List<String> securityP
addOptionIfNotNull(jaasOptions, "oauth.client.secret", String.format(PLACEHOLDER_OAUTH_CLIENT_SECRET, listenerNameInEnvVar));
}

if (oauth.getTlsTrustedCertificates() != null && oauth.getTlsTrustedCertificates().size() > 0) {
if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
addOptionIfNotNull(jaasOptions, "oauth.ssl.truststore.location", String.format("/tmp/kafka/oauth-%s.truststore.p12", listenerNameInProperty));
addOptionIfNotNull(jaasOptions, "oauth.ssl.truststore.password", PLACEHOLDER_CERT_STORE_PASSWORD);
addOptionIfNotNull(jaasOptions, "oauth.ssl.truststore.type", "PKCS12");
Expand All @@ -443,7 +442,7 @@ private void configureAuthentication(String listenerName, List<String> securityP
writer.println(String.format("listener.name.%s.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler", listenerNameInProperty));
writer.println(String.format("listener.name.%s.plain.sasl.jaas.config=%s", listenerNameInProperty,
AuthenticationUtils.jaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule", jaasOptions)));
if (enabledMechanisms.length() > 0) {
if (!enabledMechanisms.isEmpty()) {
enabledMechanisms.append(",");
}
enabledMechanisms.append("PLAIN");
Expand Down Expand Up @@ -474,7 +473,7 @@ private void configureAuthentication(String listenerName, List<String> securityP
securityProtocol.add(String.format("%s:%s", listenerName, getSecurityProtocol(tls, customAuth.isSasl())));
Map<String, Object> listenerConfig = customAuth.getListenerConfig();
if (listenerConfig == null) {
listenerConfig = new HashMap<String, Object>();
listenerConfig = new HashMap<>();
}
KafkaListenerCustomAuthConfiguration config = new KafkaListenerCustomAuthConfiguration(reconciliation, listenerConfig.entrySet());
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println(String.format("listener.name.%s.%s=%s", listenerNameInProperty, key, value)));
Expand Down Expand Up @@ -603,7 +602,7 @@ public KafkaBrokerConfigurationBuilder withAuthorization(String clusterName, Kaf
if (authorization != null) {
List<String> superUsers = new ArrayList<>();

// Broker super users
// Broker superusers
superUsers.add(String.format("User:CN=%s,O=io.strimzi", KafkaResources.kafkaComponentName(clusterName)));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "entity-topic-operator"));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "entity-user-operator"));
Expand Down Expand Up @@ -650,7 +649,7 @@ private void configureSimpleAuthorization(KafkaAuthorizationSimple authorization
writer.println("authorizer.class.name=" + KafkaAuthorizationSimple.KRAFT_AUTHORIZER_CLASS_NAME);

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand All @@ -671,14 +670,14 @@ private void configureOpaAuthorization(KafkaAuthorizationOpa authorization, List
writer.println(String.format("%s=%d", "opa.authorizer.cache.maximum.size", authorization.getMaximumCacheSize()));
writer.println(String.format("%s=%d", "opa.authorizer.cache.expire.after.seconds", Duration.ofMillis(authorization.getExpireAfterMs()).getSeconds()));

if (authorization.getTlsTrustedCertificates() != null && authorization.getTlsTrustedCertificates().size() > 0) {
if (authorization.getTlsTrustedCertificates() != null && !authorization.getTlsTrustedCertificates().isEmpty()) {
writer.println("opa.authorizer.truststore.path=/tmp/kafka/authz-opa.truststore.p12");
writer.println("opa.authorizer.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD);
writer.println("opa.authorizer.truststore.type=PKCS12");
}

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand Down Expand Up @@ -714,7 +713,7 @@ private void configureKeycloakAuthorization(String clusterName, KafkaAuthorizati

writer.println("strimzi.authorization.kafka.cluster.name=" + clusterName);

if (authorization.getTlsTrustedCertificates() != null && authorization.getTlsTrustedCertificates().size() > 0) {
if (authorization.getTlsTrustedCertificates() != null && !authorization.getTlsTrustedCertificates().isEmpty()) {
writer.println("strimzi.authorization.ssl.truststore.location=/tmp/kafka/authz-keycloak.truststore.p12");
writer.println("strimzi.authorization.ssl.truststore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD);
writer.println("strimzi.authorization.ssl.truststore.type=PKCS12");
Expand All @@ -723,7 +722,7 @@ private void configureKeycloakAuthorization(String clusterName, KafkaAuthorizati
}

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand All @@ -738,7 +737,7 @@ private void configureCustomAuthorization(KafkaAuthorizationCustom authorization
writer.println("authorizer.class.name=" + authorization.getAuthorizerClass());

// User configured super-users
if (authorization.getSuperUsers() != null && authorization.getSuperUsers().size() > 0) {
if (authorization.getSuperUsers() != null && !authorization.getSuperUsers().isEmpty()) {
superUsers.addAll(authorization.getSuperUsers().stream().map(e -> String.format("User:%s", e)).toList());
}
}
Expand Down
Loading