Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick upstreams #483

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private enum State {
CLOSING
}

private static final String TLS13 = "TLSv1.3";

private final String channelId;
private final SSLEngine sslEngine;
private final SelectionKey key;
Expand Down Expand Up @@ -461,7 +463,7 @@ private void handshakeFinished() throws IOException {
if (netWriteBuffer.hasRemaining())
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
else {
state = sslEngine.getSession().getProtocol().equals("TLSv1.3") ? State.POST_HANDSHAKE : State.READY;
state = sslEngine.getSession().getProtocol().equals(TLS13) ? State.POST_HANDSHAKE : State.READY;
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
SSLSession session = sslEngine.getSession();
log.debug("SSL handshake completed successfully with peerHost '{}' peerPort {} peerPrincipal '{}' cipherSuite '{}'",
Expand Down Expand Up @@ -593,10 +595,11 @@ public int read(ByteBuffer dst) throws IOException {
throw e;
}
netReadBuffer.compact();
// handle ssl renegotiation.
// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed
if (unwrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
unwrapResult.getHandshakeStatus() != HandshakeStatus.FINISHED &&
unwrapResult.getStatus() == Status.OK) {
unwrapResult.getStatus() == Status.OK &&
!sslEngine.getSession().getProtocol().equals(TLS13)) {
log.error("Renegotiation requested, but it is not supported, channelId {}, " +
"appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {} handshakeStatus {}", channelId,
appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position(), unwrapResult.getHandshakeStatus());
Expand Down Expand Up @@ -720,9 +723,12 @@ public int write(ByteBuffer src) throws IOException {
SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
netWriteBuffer.flip();

//handle ssl renegotiation
if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
// reject renegotiation if TLS < 1.3, key updates for TLS 1.3 are allowed
if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING &&
wrapResult.getStatus() == Status.OK &&
!sslEngine.getSession().getProtocol().equals(TLS13)) {
throw renegotiationException();
}

if (wrapResult.getStatus() == Status.OK) {
written += wrapResult.bytesConsumed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;

import java.io.IOException;
Expand All @@ -50,6 +51,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -59,6 +62,8 @@
*
*/
public class NioEchoServer extends Thread {
private final static Logger LOG = LoggerFactory.getLogger(NioEchoServer.class);

public enum MetricType {
TOTAL, RATE, AVG, MAX;

Expand Down Expand Up @@ -243,7 +248,7 @@ public void run() {
}
}
} catch (IOException e) {
// ignore
LOG.warn(e.getMessage(), e);
}
}

Expand Down Expand Up @@ -346,6 +351,7 @@ public void closeSocketChannels() throws IOException {
public void close() throws IOException, InterruptedException {
this.serverSocketChannel.close();
closeSocketChannels();
Utils.closeQuietly(selector, "selector");
acceptorThread.interrupt();
acceptorThread.join();
interrupt();
Expand All @@ -358,8 +364,10 @@ public AcceptorThread() {
}
@Override
public void run() {
java.nio.channels.Selector acceptSelector = null;

try {
java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
acceptSelector = java.nio.channels.Selector.open();
serverSocketChannel.register(acceptSelector, SelectionKey.OP_ACCEPT);
while (serverSocketChannel.isOpen()) {
if (acceptSelector.select(1000) > 0) {
Expand All @@ -377,7 +385,9 @@ public void run() {
}
}
} catch (IOException e) {
// ignore
LOG.warn(e.getMessage(), e);
} finally {
Utils.closeQuietly(acceptSelector, "acceptSelector");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ public void tearDown() throws Exception {
}
}

public SecurityProtocol securityProtocol() {
return SecurityProtocol.PLAINTEXT;
}

protected Map<String, Object> clientConfigs() {
return new HashMap<>();
}
Expand Down Expand Up @@ -1019,7 +1015,6 @@ public void testChannelCloseWhileProcessingReceives() throws Exception {

private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);
while (true) {
selector.poll(1000L);
for (NetworkReceive receive : selector.completedReceives())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.network;

import java.nio.channels.SelectionKey;
import java.security.GeneralSecurityException;
import javax.net.ssl.SSLEngine;

import org.apache.kafka.common.config.SecurityConfig;
Expand Down Expand Up @@ -46,11 +47,9 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -64,7 +63,7 @@
/**
* A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
*/
public class SslSelectorTest extends SelectorTest {
public abstract class SslSelectorTest extends SelectorTest {

private Map<String, Object> sslClientConfigs;

Expand All @@ -76,7 +75,7 @@ public void setUp() throws Exception {
this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
this.server.start();
this.time = new MockTime();
sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT, trustStoreFile, "client");
sslClientConfigs = createSslClientConfigs(trustStoreFile);
LogContext logContext = new LogContext();
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext);
this.channelBuilder.configure(sslClientConfigs);
Expand All @@ -85,26 +84,22 @@ public void setUp() throws Exception {
this.sensor = metrics.sensor("infoSensor", new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), Sensor.RecordingLevel.INFO);
}

protected abstract Map<String, Object> createSslClientConfigs(File trustStoreFile) throws GeneralSecurityException, IOException;

@AfterEach
public void tearDown() throws Exception {
this.selector.close();
this.server.close();
this.metrics.close();
}

@Override
public SecurityProtocol securityProtocol() {
return SecurityProtocol.PLAINTEXT;
}

@Override
protected Map<String, Object> clientConfigs() {
return sslClientConfigs;
}

@Test
public void testConnectionWithCustomKeyManager() throws Exception {

TestProviderCreator testProviderCreator = new TestProviderCreator();

int requestSize = 100 * 1024;
Expand Down Expand Up @@ -256,35 +251,6 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyCon
verifySelectorEmpty();
}

/**
* Renegotiation is not supported since it is potentially unsafe and it has been removed in TLS 1.3
*/
@Test
public void testRenegotiationFails() throws Exception {
String node = "0";
// create connections
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

// send echo requests and receive responses
while (!selector.isChannelReady(node)) {
selector.poll(1000L);
}
selector.send(createSend(node, node + "-" + 0));
selector.poll(0L);
server.renegotiate();
selector.send(createSend(node, node + "-" + 1));
long expiryTime = System.currentTimeMillis() + 2000;

List<String> disconnected = new ArrayList<>();
while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
selector.poll(10);
disconnected.addAll(selector.disconnected().keySet());
}
assertTrue(disconnected.contains(node), "Renegotiation should cause disconnection");

}

@Override
@Test
public void testMuteOnOOM() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.network;

import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.test.TestSslUtils;
import org.junit.jupiter.api.Test;

public class Tls12SelectorTest extends SslSelectorTest {

@Override
protected Map<String, Object> createSslClientConfigs(File trustStoreFile)
throws GeneralSecurityException, IOException {
Map<String, Object> configs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT,
trustStoreFile, "client");
configs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, asList("TLSv1.2"));
return configs;
}

/**
* Renegotiation is not supported when TLS 1.2 is used (renegotiation was removed from TLS 1.3)
*/
@Test
public void testRenegotiationFails() throws Exception {
String node = "0";
// create connections
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);

// send echo requests and receive responses
while (!selector.isChannelReady(node)) {
selector.poll(1000L);
}
selector.send(createSend(node, node + "-" + 0));
selector.poll(0L);
server.renegotiate();
selector.send(createSend(node, node + "-" + 1));
long expiryTime = System.currentTimeMillis() + 2000;

List<String> disconnected = new ArrayList<>();
while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
selector.poll(10);
disconnected.addAll(selector.disconnected().keySet());
}
assertTrue(disconnected.contains(node), "Renegotiation should cause disconnection");
}
}
Loading
Loading