From 173410f09791155abf54b27fdf7fe7fdcb496002 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 20 Sep 2023 14:32:27 +0200 Subject: [PATCH] Re-introduce the beats handlers worker group to separata the Beats protocol processing from the boss group that accepts and listed to new sockets --- src/main/java/org/logstash/beats/Server.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 90bc733b..1254ed9e 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -1,9 +1,11 @@ package org.logstash.beats; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -114,6 +116,7 @@ private class BeatsInitializer extends ChannelInitializer { private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5; private final EventExecutorGroup idleExecutorGroup; + private final EventExecutorGroup beatsHandlerExecutorGroup; private final IMessageListener localMessageListener; private final int localClientInactivityTimeoutSeconds; @@ -122,6 +125,7 @@ private class BeatsInitializer extends ChannelInitializer { this.localMessageListener = messageListener; this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); + beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); } public void initChannel(SocketChannel socket){ @@ -137,8 +141,8 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new ThunderingGuardHandler()); - pipeline.addLast(new BeatsParser()); - pipeline.addLast(new BeatsHandler(localMessageListener)); + pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser()); + pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener)); }