Skip to content

Commit

Permalink
Updated OOMConnectionCloser to monitor the consumption of memory also…
Browse files Browse the repository at this point in the history
… during the read opeartion and not only on exception
  • Loading branch information
andsel committed Sep 20, 2023
1 parent 50c4625 commit 17f0213
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 10 deletions.
5 changes: 5 additions & 0 deletions src/main/java/org/logstash/beats/BeatsHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.logstash.beats;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -88,6 +90,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
logger.info(format("closing (" + cause.getMessage() + ")"));
}
} else {
PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
OOMConnectionCloser.DirectMemoryUsage usageSnapshot = OOMConnectionCloser.DirectMemoryUsage.capture(allocator);
logger.info("Connection {}, memory status used: {}, pinned: {}, ratio {}", ctx.channel(), usageSnapshot.used, usageSnapshot.pinned, usageSnapshot.ratio);
final Throwable realCause = extractCause(cause, 0);
if (logger.isDebugEnabled()){
logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause);
Expand Down
47 changes: 38 additions & 9 deletions src/main/java/org/logstash/beats/OOMConnectionCloser.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -12,33 +14,60 @@

public class OOMConnectionCloser extends ChannelInboundHandlerAdapter {

private static class DirectMemoryUsage {
private final long used;
private final long pinned;
private final short ratio;
private final PooledByteBufAllocator allocator;

private DirectMemoryUsage(long used, long pinned) {
static class DirectMemoryUsage {
final long used;
final long pinned;
private final PooledByteBufAllocator allocator;
final short ratio;

private DirectMemoryUsage(long used, long pinned, PooledByteBufAllocator allocator) {
this.used = used;
this.pinned = pinned;
this.allocator = allocator;
this.ratio = (short) Math.round(((double) pinned / used) * 100);
}

static DirectMemoryUsage capture() {
PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
static DirectMemoryUsage capture(PooledByteBufAllocator allocator) {
long usedDirectMemory = allocator.metric().usedDirectMemory();
long pinnedDirectMemory = allocator.pinnedDirectMemory();
return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory);
return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory, allocator);
}

boolean isCloseToOOM() {
long maxDirectMemory = PlatformDependent.maxDirectMemory();
int chunkSize = allocator.metric().chunkSize();
return ((maxDirectMemory - used) <= chunkSize) && ratio > 75;
}
}

private final static Logger logger = LogManager.getLogger(OOMConnectionCloser.class);

public static final Pattern DIRECT_MEMORY_ERROR = Pattern.compile("^Cannot reserve \\d* bytes of direct buffer memory.*$");

OOMConnectionCloser() {
allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator);
logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio);
if (direct.isCloseToOOM()) {
logger.warn("Closing connection {} because running out of memory, used: {}, pinned: {}, ratio {}", ctx.channel(), direct.used, direct.pinned, direct.ratio);
ReferenceCountUtil.release(msg); // to free the memory used by the buffer
ctx.flush();
ctx.close();
} else {
super.channelRead(ctx, msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (isDirectMemoryOOM(cause)) {
DirectMemoryUsage direct = DirectMemoryUsage.capture();
DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator);
logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio);
logger.warn("Dropping connection {} due to lack of available Direct Memory. Please lower the number of concurrent connections or reduce the batch size. " +
"Alternatively, raise -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {

public void initChannel(SocketChannel socket){
ChannelPipeline pipeline = socket.pipeline();
pipeline.addLast(new OOMConnectionCloser());

if (isSslEnabled()) {
pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket));
Expand All @@ -137,7 +138,6 @@ public void initChannel(SocketChannel socket){
pipeline.addLast(new FlowLimiterHandler());
pipeline.addLast(new ThunderingGuardHandler());
pipeline.addLast(new BeatsParser());
pipeline.addLast(new OOMConnectionCloser());
pipeline.addLast(new BeatsHandler(localMessageListener));
}

Expand Down

0 comments on commit 17f0213

Please sign in to comment.