From aea4678dbda60f7247bbc6e30c2af56302a8afc1 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 20 Sep 2023 15:35:52 +0200 Subject: [PATCH] Added feature flag named protect_direct_memory to control the usage of OOM checking or not. Enabled by default. --- docs/index.asciidoc | 12 ++++++++++++ lib/logstash/inputs/beats.rb | 6 +++++- spec/inputs/beats_spec.rb | 7 ++++--- src/main/java/org/logstash/beats/Runner.java | 2 +- src/main/java/org/logstash/beats/Server.java | 18 ++++++++++++++---- .../java/org/logstash/beats/ServerTest.java | 6 +++--- 6 files changed, 39 insertions(+), 12 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index f8547ad8..7c046bd4 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -221,6 +221,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|__Deprecated__ | <> |<>|Yes +| <> |<>|No | <> |<>|__Deprecated__ | <> |a valid filesystem path|No | <> |<>|No @@ -385,6 +386,17 @@ deprecated[6.5.0, Replaced by <>] The port to listen on. +[id="plugins-{type}s-{plugin}-protect_direct_memory"] +===== `protect_direct_memory` + + * Value type is <> + * Default value is `true` + +If enabled, actively check native memory used by network part to do parsing and avoid +out of memory conditions. When the consumption of native memory used is close to +the maximum limit, connections are being closed in undetermined order until the safe +memory condition is reestablished. + [id="plugins-{type}s-{plugin}-ssl"] ===== `ssl` deprecated[6.6.0, Replaced by <>] diff --git a/lib/logstash/inputs/beats.rb b/lib/logstash/inputs/beats.rb index 3a302bfd..0d59fad0 100644 --- a/lib/logstash/inputs/beats.rb +++ b/lib/logstash/inputs/beats.rb @@ -74,6 +74,10 @@ class LogStash::Inputs::Beats < LogStash::Inputs::Base # The port to listen on. config :port, :validate => :number, :required => true + # Proactive checks that keep the beats input active when the memory used by protocol parser and network + # related operations is going to terminate. + config :protect_direct_memory, :validate => :boolean, :default => true + # Events are by default sent in plain text. You can # enable encryption by setting `ssl` to true and configuring # the `ssl_certificate` and `ssl_key` options. @@ -243,7 +247,7 @@ def register end # def register def create_server - server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads) + server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, @protect_direct_memory) server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled server end diff --git a/spec/inputs/beats_spec.rb b/spec/inputs/beats_spec.rb index f68a33da..7c0694da 100644 --- a/spec/inputs/beats_spec.rb +++ b/spec/inputs/beats_spec.rb @@ -14,6 +14,7 @@ let(:port) { BeatsInputTest.random_port } let(:client_inactivity_timeout) { 400 } let(:threads) { 1 + rand(9) } + let(:protect_direct_memory) { true } let(:queue) { Queue.new } let(:config) do { @@ -36,7 +37,7 @@ let(:port) { 9001 } it "sends the required options to the server" do - expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads) + expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory) subject.register end end @@ -529,8 +530,8 @@ subject(:plugin) { LogStash::Inputs::Beats.new(config) } before do - @server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads) - expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads).and_return @server + @server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads, protect_direct_memory) + expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory).and_return @server expect( @server ).to receive(:listen) subject.register diff --git a/src/main/java/org/logstash/beats/Runner.java b/src/main/java/org/logstash/beats/Runner.java index 0cb623e4..548f6ef1 100644 --- a/src/main/java/org/logstash/beats/Runner.java +++ b/src/main/java/org/logstash/beats/Runner.java @@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception { // Check for leaks. // ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); - Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors()); + Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), true); if(args.length > 0 && args[0].equals("ssl")) { logger.debug("Using SSL"); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 1254ed9e..983d80a9 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -22,6 +22,7 @@ public class Server { private final int port; private final String host; private final int beatsHeandlerThreadCount; + private final boolean protectDirectMemory; private NioEventLoopGroup workGroup; private IMessageListener messageListener = new MessageListener(); private SslHandlerProvider sslHandlerProvider; @@ -29,11 +30,16 @@ public class Server { private final int clientInactivityTimeoutSeconds; - public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) { +// public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) { +// this(host, p, clientInactivityTimeoutSeconds, threadCount, true); +// } + + public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount, boolean protectDirectMemory) { this.host = host; port = p; this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; beatsHeandlerThreadCount = threadCount; + this.protectDirectMemory = protectDirectMemory; } public void setSslHandlerProvider(SslHandlerProvider sslHandlerProvider){ @@ -130,7 +136,9 @@ private class BeatsInitializer extends ChannelInitializer { public void initChannel(SocketChannel socket){ ChannelPipeline pipeline = socket.pipeline(); - pipeline.addLast(new OOMConnectionCloser()); + if (protectDirectMemory) { + pipeline.addLast(new OOMConnectionCloser()); + } if (isSslEnabled()) { pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); @@ -139,8 +147,10 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); - pipeline.addLast(new FlowLimiterHandler()); - pipeline.addLast(new ThunderingGuardHandler()); + if (protectDirectMemory) { + pipeline.addLast(new FlowLimiterHandler()); + pipeline.addLast(new ThunderingGuardHandler()); + } pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser()); pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener)); } diff --git a/src/test/java/org/logstash/beats/ServerTest.java b/src/test/java/org/logstash/beats/ServerTest.java index 37512cdc..b067e55d 100644 --- a/src/test/java/org/logstash/beats/ServerTest.java +++ b/src/test/java/org/logstash/beats/ServerTest.java @@ -50,7 +50,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte final CountDownLatch latch = new CountDownLatch(concurrentConnections); - final Server server = new Server(host, randomPort, inactivityTime, threadCount); + final Server server = new Server(host, randomPort, inactivityTime, threadCount, true); final AtomicBoolean otherCause = new AtomicBoolean(false); server.setMessageListener(new MessageListener() { public void onNewConnection(ChannelHandlerContext ctx) { @@ -114,7 +114,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt final CountDownLatch latch = new CountDownLatch(concurrentConnections); final AtomicBoolean exceptionClose = new AtomicBoolean(false); - final Server server = new Server(host, randomPort, inactivityTime, threadCount); + final Server server = new Server(host, randomPort, inactivityTime, threadCount, true); server.setMessageListener(new MessageListener() { @Override public void onNewConnection(ChannelHandlerContext ctx) { @@ -170,7 +170,7 @@ public void run() { @Test public void testServerShouldAcceptConcurrentConnection() throws InterruptedException { - final Server server = new Server(host, randomPort, 30, threadCount); + final Server server = new Server(host, randomPort, 30, threadCount, true); SpyListener listener = new SpyListener(); server.setMessageListener(listener); Runnable serverTask = new Runnable() {