Skip to content

Commit

Permalink
Fix TLS stability issues with V2 protocol that caused data corruption (
Browse files Browse the repository at this point in the history
…apache#4404)

* Fix TLS stability issues with V2 protocol that caused data corruption
- add the TLS handler after the FlushConsolidationHandler
  - This makes TLS connections from Pulsar Broker to Bookkeeper stable
    when bookkeeperUseV2WireProtocol=true is used
- Fix test TestTLS for V2
- Fix inconsistency in client configuration in BookKeeperClusterTestCase
  • Loading branch information
lhotari authored May 29, 2024
1 parent 2970aef commit 5f73147
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
class BookieNettyServer {

private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class);
public static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final int maxFrameSize;
final ServerConfiguration conf;
Expand Down Expand Up @@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
new BookieSideConnectionPeerContextHandler();
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));

pipeline.addLast("bytebufList", ByteBufList.ENCODER);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
public class BookieRequestProcessor implements RequestProcessor {

private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
public static final String TLS_HANDLER_NAME = "tls";

/**
* The server configuration. We use this for getting the number of add and read
Expand Down Expand Up @@ -580,9 +581,15 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r,
response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
writeAndFlush(c, response.build());
} else {
LOG.info("Starting TLS handshake with client on channel {}", c);
// there is no need to execute in a different thread as this operation is light
SslHandler sslHandler = shFactory.newTLSHandler();
c.pipeline().addFirst("tls", sslHandler);
if (c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) {
c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, TLS_HANDLER_NAME, sslHandler);
} else {
// local transport doesn't contain FlushConsolidationHandler
c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler);
}

response.setStatus(BookkeeperProtocol.StatusCode.EOK);
BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
static final String CONSOLIDATION_HANDLER_NAME = "consolidation";

final BookieId bookieId;
final BookieAddressResolver bookieAddressResolver;
Expand Down Expand Up @@ -595,7 +596,7 @@ protected ChannelFuture connect() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true));
pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true));
pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
Expand Down Expand Up @@ -1573,9 +1574,16 @@ void initTLSHandshake() {
} else {
throw new RuntimeException("Unexpected socket address type");
}
SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler);
handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort());
SslHandler sslHandler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
String sslHandlerName = parentObj.shFactory.getHandlerName();
if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) {
channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, sslHandlerName, sslHandler);
} else {
// local transport doesn't contain FlushConsolidationHandler
channel.pipeline().addFirst(sslHandlerName, sslHandler);
}
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
int rc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception {
}

protected ClientConfiguration newClientConfiguration() {
return new ClientConfiguration(baseConf);
return new ClientConfiguration(baseClientConf);
}

protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception {
*/
@Test
public void testConnectToLocalTLSClusterTLSClient() throws Exception {
// skip test
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setDisableServerSocketBind(true);
c.setEnableLocalTransport(true);
Expand Down Expand Up @@ -622,10 +617,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio
*/
@Test
public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception {
if (useV2Protocol) {
return;
}

restartBookies(c -> {
c.setBookieAuthProviderFactoryClass(
AllowOnlyClientsWithX509Certificates.class.getName());
Expand Down Expand Up @@ -756,6 +747,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception
testClient(clientConf, numBookies);
fail("Shouldn't be able to connect");
} catch (BKException.BKUnauthorizedAccessException authFailed) {
} catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) {
if (!useV2Protocol) {
fail("Unexpected exception occurred.");
}
}

assertFalse(secureBookieSideChannel);
Expand Down

0 comments on commit 5f73147

Please sign in to comment.