Skip to content

Commit

Permalink
Re-introduce the beats handlers worker group to separata the Beats pr…
Browse files Browse the repository at this point in the history
…otocol processing from the boss group that accepts and listed to new sockets
  • Loading branch information
andsel committed Sep 20, 2023
1 parent 17f0213 commit fdd000f
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.logstash.beats;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
Expand Down Expand Up @@ -114,6 +116,7 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5;

private final EventExecutorGroup idleExecutorGroup;
private final EventExecutorGroup beatsHandlerExecutorGroup;
private final IMessageListener localMessageListener;
private final int localClientInactivityTimeoutSeconds;

Expand All @@ -122,6 +125,7 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
this.localMessageListener = messageListener;
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
}

public void initChannel(SocketChannel socket){
Expand All @@ -137,8 +141,8 @@ public void initChannel(SocketChannel socket){
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
pipeline.addLast(new FlowLimiterHandler());
pipeline.addLast(new ThunderingGuardHandler());
pipeline.addLast(new BeatsParser());
pipeline.addLast(new BeatsHandler(localMessageListener));
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser());
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener));
}


Expand Down

0 comments on commit fdd000f

Please sign in to comment.