From 3ff58711cf3578995e6d48d3ec2e933fc48f2a91 Mon Sep 17 00:00:00 2001 From: "Joseph (Ting-Chou) Lin" Date: Mon, 30 Oct 2023 15:36:24 -0700 Subject: [PATCH] [LI-CHERRY-PICK] KAFKA-13457: SocketChannel in Acceptor#accept is not closed upon IOException (#11504) (#486) This patch ensures that SocketChannel in Acceptor#accept is closed if an IOException is thrown while the socket is configured. Reviewers: Luke Chen , David Jacot Co-authored-by: Haoze Wu <18595686+functioner@users.noreply.github.com> --- .../scala/kafka/network/SocketServer.scala | 22 ++++++++---- .../unit/kafka/network/SocketServerTest.scala | 34 +++++++++++++++++++ 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 42f454b4842a8..690ab892aee22 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -84,7 +84,7 @@ class SocketServer(val config: KafkaConfig, private val maxQueuedRequests = config.queuedMaxRequests - private val nodeId = config.brokerId + protected val nodeId = config.brokerId private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ") @@ -291,7 +291,7 @@ class SocketServer(val config: KafkaConfig, } } - private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = { + protected def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) @@ -726,11 +726,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter) - socketChannel.configureBlocking(false) - socketChannel.socket().setTcpNoDelay(true) - socketChannel.socket().setKeepAlive(true) - if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) - socketChannel.socket().setSendBufferSize(sendBufferSize) + configureAcceptedSocketChannel(socketChannel) Some(socketChannel) } catch { case e: TooManyConnectionsException => @@ -743,9 +739,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs) None + case e: IOException => + error(s"Encountered an error while configuring the connection, closing it.", e) + close(endPoint.listenerName, socketChannel) + None } } + protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = { + socketChannel.configureBlocking(false) + socketChannel.socket().setTcpNoDelay(true) + socketChannel.socket().setKeepAlive(true) + if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) + socketChannel.socket().setSendBufferSize(sendBufferSize) + } + /** * Close sockets for any connections that have been throttled. */ diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6e5daebb984bd..1183a31fde513 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNod import com.yammer.metrics.core.{Gauge, Meter} import javax.net.ssl._ +import kafka.cluster.EndPoint import kafka.metrics.KafkaYammerMetrics import kafka.security.CredentialProvider import kafka.server.{KafkaConfig, Observer, SimpleApiVersionManager, ThrottleCallback, ThrottledChannel} @@ -873,6 +874,39 @@ class SocketServerTest { } } + @Test + def testExceptionInAcceptor(): Unit = { + val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val serverMetrics = new Metrics() + + val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, + Time.SYSTEM, credentialProvider, observer, apiVersionManager) { + + // same as SocketServer.createAcceptor, + // except the Acceptor overriding a method to inject the exception + override protected def createAcceptor(endPoint: EndPoint, metricPrefix: String): Acceptor = { + val sendBufferSize = config.socketSendBufferBytes + val recvBufferSize = config.socketReceiveBufferBytes + new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) { + override protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = { + assertEquals(1, connectionQuotas.get(socketChannel.socket.getInetAddress)) + throw new IOException("test injected IOException") + } + } + } + } + + try { + overrideServer.startup() + val conn = connect(overrideServer) + conn.setSoTimeout(3000) + assertEquals(-1, conn.getInputStream.read()) + assertEquals(0, overrideServer.connectionQuotas.get(conn.getInetAddress)) + } finally { + shutdownServerAndMetrics(overrideServer) + } + } + @Test def testConnectionRatePerIp(): Unit = { val defaultTimeoutMs = 2000