diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index 9e96bcf3..efe463eb 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -1,5 +1,7 @@ package org.logstash.beats; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.logging.log4j.LogManager; @@ -88,6 +90,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E logger.info(format("closing (" + cause.getMessage() + ")")); } } else { + PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + OOMConnectionCloser.DirectMemoryUsage usageSnapshot = OOMConnectionCloser.DirectMemoryUsage.capture(allocator); + logger.info("Connection {}, memory status used: {}, pinned: {}, ratio {}", ctx.channel(), usageSnapshot.used, usageSnapshot.pinned, usageSnapshot.ratio); final Throwable realCause = extractCause(cause, 0); if (logger.isDebugEnabled()){ logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause); diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java index 57a69802..751aba53 100644 --- a/src/main/java/org/logstash/beats/OOMConnectionCloser.java +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -4,6 +4,8 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.PlatformDependent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,22 +14,31 @@ public class OOMConnectionCloser extends ChannelInboundHandlerAdapter { - private static class DirectMemoryUsage { - private final long used; - private final long pinned; - private final short ratio; + private final PooledByteBufAllocator allocator; - private DirectMemoryUsage(long used, long pinned) { + static class DirectMemoryUsage { + final long used; + final long pinned; + private final PooledByteBufAllocator allocator; + final short ratio; + + private DirectMemoryUsage(long used, long pinned, PooledByteBufAllocator allocator) { this.used = used; this.pinned = pinned; + this.allocator = allocator; this.ratio = (short) Math.round(((double) pinned / used) * 100); } - static DirectMemoryUsage capture() { - PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + static DirectMemoryUsage capture(PooledByteBufAllocator allocator) { long usedDirectMemory = allocator.metric().usedDirectMemory(); long pinnedDirectMemory = allocator.pinnedDirectMemory(); - return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory); + return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory, allocator); + } + + boolean isCloseToOOM() { + long maxDirectMemory = PlatformDependent.maxDirectMemory(); + int chunkSize = allocator.metric().chunkSize(); + return ((maxDirectMemory - used) <= chunkSize) && ratio > 75; } } @@ -35,10 +46,28 @@ static DirectMemoryUsage capture() { public static final Pattern DIRECT_MEMORY_ERROR = Pattern.compile("^Cannot reserve \\d* bytes of direct buffer memory.*$"); + OOMConnectionCloser() { + allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator); + logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); + if (direct.isCloseToOOM()) { + logger.warn("Closing connection {} because running out of memory, used: {}, pinned: {}, ratio {}", ctx.channel(), direct.used, direct.pinned, direct.ratio); + ReferenceCountUtil.release(msg); // to free the memory used by the buffer + ctx.flush(); + ctx.close(); + } else { + super.channelRead(ctx, msg); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (isDirectMemoryOOM(cause)) { - DirectMemoryUsage direct = DirectMemoryUsage.capture(); + DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator); logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); logger.warn("Dropping connection {} due to lack of available Direct Memory. Please lower the number of concurrent connections or reduce the batch size. " + "Alternatively, raise -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index e456640d..90bc733b 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -126,6 +126,7 @@ private class BeatsInitializer extends ChannelInitializer { public void initChannel(SocketChannel socket){ ChannelPipeline pipeline = socket.pipeline(); + pipeline.addLast(new OOMConnectionCloser()); if (isSslEnabled()) { pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); @@ -137,7 +138,6 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new ThunderingGuardHandler()); pipeline.addLast(new BeatsParser()); - pipeline.addLast(new OOMConnectionCloser()); pipeline.addLast(new BeatsHandler(localMessageListener)); }