diff --git a/CHANGELOG.md b/CHANGELOG.md index 38b67823..a3341689 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 3.9.0 + - Netty boss and worker groups are separated [#178](https://github.com/logstash-plugins/logstash-input-http/pull/178) + As a result, when shutdown requested incoming connections are closed first and improved graceful shutdown + ## 3.8.1 - bump netty to 4.1.109 [#173](https://github.com/logstash-plugins/logstash-input-http/pull/173) diff --git a/VERSION b/VERSION index f2807196..a5c4c763 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.8.1 +3.9.0 diff --git a/src/main/java/org/logstash/plugins/inputs/http/NettyHttpServer.java b/src/main/java/org/logstash/plugins/inputs/http/NettyHttpServer.java index 12eb7945..f578cef6 100644 --- a/src/main/java/org/logstash/plugins/inputs/http/NettyHttpServer.java +++ b/src/main/java/org/logstash/plugins/inputs/http/NettyHttpServer.java @@ -26,6 +26,7 @@ public class NettyHttpServer implements Runnable, Closeable { private final int port; private final int connectionBacklog = 128; + private final EventLoopGroup bossGroup; private final EventLoopGroup processorGroup; private final ThreadPoolExecutor executorGroup; private final HttpResponseStatus responseStatus; @@ -37,8 +38,14 @@ public NettyHttpServer(String host, int port, IMessageHandler messageHandler, this.host = host; this.port = port; this.responseStatus = HttpResponseStatus.valueOf(responseCode); + + // boss group is responsible for accepting incoming connections and sending to worker loop + // process group is channel handler, see the https://github.com/netty/netty/discussions/13305 + // see the https://github.com/netty/netty/discussions/11808#discussioncomment-1610918 for why separation is good + bossGroup = new NioEventLoopGroup(1, daemonThreadFactory("http-input-connector")); processorGroup = new NioEventLoopGroup(threads, daemonThreadFactory("http-input-processor")); + // event handler group executorGroup = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(maxPendingRequests), daemonThreadFactory("http-input-handler-executor"), new CustomRejectedExecutionHandler()); @@ -51,7 +58,7 @@ public NettyHttpServer(String host, int port, IMessageHandler messageHandler, } serverBootstrap = new ServerBootstrap() - .group(processorGroup) + .group(bossGroup, processorGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, connectionBacklog) .childOption(ChannelOption.SO_KEEPALIVE, true) @@ -73,7 +80,9 @@ public void run() { public void close() { try { // stop accepting new connections first - processorGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).sync(); + bossGroup.shutdownGracefully().sync(); + // stop the worker group + processorGroup.shutdownGracefully().sync(); // then shutdown the message handler executor executorGroup.shutdown(); try {