From 7b30c5efa852ce5c7ed1bb64cde45ee1bf849f26 Mon Sep 17 00:00:00 2001 From: Ze Mao Date: Fri, 21 Jun 2019 15:25:26 -0700 Subject: [PATCH] KAFKA-8583: Optimization for SslTransportLayer#write(ByteBuffer) Warp data as many as possible in SslTransportLayer#write(ByteBuffer) The change comes from Ambry, whose TransportLayer code is same with Kafka. https://github.com/linkedin/ambry/pull/1105 --- .../common/network/SslTransportLayer.java | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 08a39e71d50ba..e0c596b44b5d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -630,37 +630,42 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { */ @Override public int write(ByteBuffer src) throws IOException { - int written = 0; if (state == State.CLOSING) throw closingException(); if (state != State.READY) - return written; - + return 0; if (!flush(netWriteBuffer)) - return written; + return 0; - netWriteBuffer.clear(); - SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer); - netWriteBuffer.flip(); + int written = 0; + while (src.remaining() != 0) { + netWriteBuffer.clear(); + SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer); + netWriteBuffer.flip(); - //handle ssl renegotiation - if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) - throw renegotiationException(); + //handle ssl renegotiation + if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK) + throw renegotiationException(); - if (wrapResult.getStatus() == Status.OK) { - written = wrapResult.bytesConsumed(); - flush(netWriteBuffer); - } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) { - int currentNetWriteBufferSize = netWriteBufferSize(); - netWriteBuffer.compact(); - netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize); - netWriteBuffer.flip(); - if (netWriteBuffer.limit() >= currentNetWriteBufferSize) - throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")"); - } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) { - throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write"); - } else if (wrapResult.getStatus() == Status.CLOSED) { - throw new EOFException(); + if (wrapResult.getStatus() == Status.OK) { + written += wrapResult.bytesConsumed(); + if (!flush(netWriteBuffer)) { + // break if socketChannel can't accept all data in netWriteBuffer + break; + } + // otherwise, we are safe to go to next iteration. + } else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) { + int currentNetWriteBufferSize = netWriteBufferSize(); + netWriteBuffer.compact(); + netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize); + netWriteBuffer.flip(); + if (netWriteBuffer.limit() >= currentNetWriteBufferSize) throw new IllegalStateException( + "SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")"); + } else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) { + throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write"); + } else if (wrapResult.getStatus() == Status.CLOSED) { + throw new EOFException(); + } } return written; }