From dc8a8b07e76de63d5233226ef7c9d98e60d12361 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Jul 2023 15:48:10 +0200 Subject: [PATCH 01/19] Avoid to run Beats parser and Beats protocol handler in separate executors group (beatsHandlerExecutorGroup) --- src/main/java/org/logstash/beats/Server.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index c343aaf6..5c4e0d25 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -112,7 +112,6 @@ private class BeatsInitializer extends ChannelInitializer { private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5; private final EventExecutorGroup idleExecutorGroup; - private final EventExecutorGroup beatsHandlerExecutorGroup; private final IMessageListener localMessageListener; private final int localClientInactivityTimeoutSeconds; @@ -121,7 +120,6 @@ private class BeatsInitializer extends ChannelInitializer { this.localMessageListener = messageListener; this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); - beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); } public void initChannel(SocketChannel socket){ @@ -134,7 +132,8 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); - pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener)); + pipeline.addLast(new BeatsParser()); + pipeline.addLast(new BeatsHandler(localMessageListener)); } @@ -152,7 +151,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E public void shutdownEventExecutor() { try { idleExecutorGroup.shutdownGracefully().sync(); - beatsHandlerExecutorGroup.shutdownGracefully().sync(); } catch (InterruptedException e) { throw new IllegalStateException(e); } From 700543c0688176d9eb2fd2005d1920e4d8b2f788 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Jul 2023 15:59:57 +0200 Subject: [PATCH 02/19] Ported core part of original #410 PR --- .../java/org/logstash/beats/BeatsParser.java | 84 +++++++++++++++++-- 1 file changed, 79 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 812150b1..928b7649 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -3,8 +3,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.DecoderException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -14,12 +16,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.zip.Inflater; import java.util.zip.InflaterOutputStream; public class BeatsParser extends ByteToMessageDecoder { private final static Logger logger = LogManager.getLogger(BeatsParser.class); + private final static long maxDirectMemory = io.netty.util.internal.PlatformDependent.maxDirectMemory(); private Batch batch; @@ -45,15 +49,18 @@ private enum States { private int requiredBytes = 0; private int sequence = 0; private boolean decodingCompressedBuffer = false; + private long usedDirectMemory; + private boolean closeCalled = false; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws InvalidFrameProtocolException, IOException { - if(!hasEnoughBytes(in)) { - if (decodingCompressedBuffer){ + if (!hasEnoughBytes(in)) { + if (decodingCompressedBuffer) { throw new InvalidFrameProtocolException("Insufficient bytes in compressed content to decode: " + currentState); } return; } + usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); switch (currentState) { case READ_HEADER: { @@ -182,6 +189,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t case READ_COMPRESSED_FRAME: { logger.trace("Running: READ_COMPRESSED_FRAME"); + + if (usedDirectMemory + requiredBytes > maxDirectMemory * 0.90) { + ctx.channel().config().setAutoRead(false); + ctx.close(); + closeCalled = true; + throw new IOException("not enough memory to decompress this from " + ctx.channel().id()); + } inflateCompressedFrame(ctx, in, (buffer) -> { transition(States.READ_HEADER); @@ -192,6 +206,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } } finally { decodingCompressedBuffer = false; + ctx.channel().config().setAutoRead(false); + ctx.channel().eventLoop().schedule(() -> { + ctx.channel().config().setAutoRead(true); + }, 5, TimeUnit.MILLISECONDS); transition(States.READ_HEADER); } }); @@ -199,9 +217,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } case READ_JSON: { logger.trace("Running: READ_JSON"); - ((V2Batch)batch).addMessage(sequence, in, requiredBytes); - if(batch.isComplete()) { - if(logger.isTraceEnabled()) { + ((V2Batch) batch).addMessage(sequence, in, requiredBytes); + if (batch.isComplete()) { + if (logger.isTraceEnabled()) { logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); } out.add(batch); @@ -260,6 +278,62 @@ private void batchComplete() { batch = null; } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //System.out.println("channelRead(" + ctx.channel().isActive() + ": " + ctx.channel().id() + ":" + currentState + ":" + decodingCompressedBuffer); + if (closeCalled) { + ((ByteBuf) msg).release(); + //if(batch != null) batch.release(); + return; + } + usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); + + // If we're just beginning a new frame on this channel, + // don't accumulate more data for 25 ms if usage of direct memory is above 20% + // + // The goal here is to avoid thundering herd: many beats connecting and sending data + // at the same time. As some channels progress to other states they'll use more memory + // but also give it back once a full batch is read. + if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) { + if (usedDirectMemory > (maxDirectMemory * 0.40)) { + ctx.channel().config().setAutoRead(false); + //System.out.println("pausing reads on " + ctx.channel().id()); + ctx.channel().eventLoop().schedule(() -> { + //System.out.println("resuming reads on " + ctx.channel().id()); + ctx.channel().config().setAutoRead(true); + }, 200, TimeUnit.MILLISECONDS); + } else { + //System.out.println("no need to pause reads on " + ctx.channel().id()); + } + } else if (usedDirectMemory > maxDirectMemory * 0.90) { + ctx.channel().config().setAutoRead(false); + ctx.close(); + closeCalled = true; + ((ByteBuf) msg).release(); + if (batch != null) batch.release(); + throw new IOException("about to explode, cut them all down " + ctx.channel().id()); + } + super.channelRead(ctx, msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); + if (cause instanceof DecoderException) { + ctx.channel().config().setAutoRead(false); + if (!closeCalled) ctx.close(); + } else if (cause instanceof OutOfMemoryError) { + cause.printStackTrace(); + ctx.channel().config().setAutoRead(false); + if (!closeCalled) ctx.close(); + } else if (cause instanceof IOException) { + ctx.channel().config().setAutoRead(false); + if (!closeCalled) ctx.close(); + } else { + super.exceptionCaught(ctx, cause); + } + } + @FunctionalInterface private interface CheckedConsumer { void accept(T t) throws IOException; From 8e5f7b392c98fec4467447dd12e9fce42456ccd3 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 7 Jul 2023 14:36:51 +0200 Subject: [PATCH 03/19] Added pull way of fetching data --- .../java/org/logstash/beats/BeatsParser.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 928b7649..b264aee9 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -52,6 +52,8 @@ private enum States { private long usedDirectMemory; private boolean closeCalled = false; + private boolean stopAutoreadRequested = false; + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws InvalidFrameProtocolException, IOException { if (!hasEnoughBytes(in)) { @@ -296,17 +298,25 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // but also give it back once a full batch is read. if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) { if (usedDirectMemory > (maxDirectMemory * 0.40)) { - ctx.channel().config().setAutoRead(false); - //System.out.println("pausing reads on " + ctx.channel().id()); - ctx.channel().eventLoop().schedule(() -> { - //System.out.println("resuming reads on " + ctx.channel().id()); - ctx.channel().config().setAutoRead(true); - }, 200, TimeUnit.MILLISECONDS); + if (!stopAutoreadRequested) { + ctx.channel().config().setAutoRead(false); + stopAutoreadRequested = true; + logger.info("Set channel {} to autoread FALSE (> 40%)", ctx.channel()); + //System.out.println("pausing reads on " + ctx.channel().id()); + ctx.channel().eventLoop().schedule(() -> { + //System.out.println("resuming reads on " + ctx.channel().id()); + ctx.channel().config().setAutoRead(true); + stopAutoreadRequested = false; + logger.info("Set channel {} to autoread TRUE after 200 ms", ctx.channel()); + }, 200, TimeUnit.MILLISECONDS); + } } else { //System.out.println("no need to pause reads on " + ctx.channel().id()); } } else if (usedDirectMemory > maxDirectMemory * 0.90) { ctx.channel().config().setAutoRead(false); + stopAutoreadRequested = true; + logger.info("Set channel {} to autoread FALSE (> 90%)", ctx.channel()); ctx.close(); closeCalled = true; ((ByteBuf) msg).release(); @@ -316,6 +326,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception super.channelRead(ctx, msg); } + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); From dbabb78f9c8460e969aadd7a3ca30b13bce708ce Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 12 Jul 2023 11:48:28 +0200 Subject: [PATCH 04/19] Extracted the pull mode part into a separate handler --- .../java/org/logstash/beats/BeatsParser.java | 16 --------- .../logstash/beats/FlowLimiterHandler.java | 35 +++++++++++++++++++ src/main/java/org/logstash/beats/Server.java | 1 + 3 files changed, 36 insertions(+), 16 deletions(-) create mode 100644 src/main/java/org/logstash/beats/FlowLimiterHandler.java diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index b264aee9..22756054 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -326,22 +326,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception super.channelRead(ctx, msg); } - @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - if (!ctx.channel().config().isAutoRead()) { - ctx.channel().read(); - } - } - - @Override - public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { - super.channelReadComplete(ctx); - if (!ctx.channel().config().isAutoRead()) { - ctx.channel().read(); - } - } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); diff --git a/src/main/java/org/logstash/beats/FlowLimiterHandler.java b/src/main/java/org/logstash/beats/FlowLimiterHandler.java new file mode 100644 index 00000000..5ebeed04 --- /dev/null +++ b/src/main/java/org/logstash/beats/FlowLimiterHandler.java @@ -0,0 +1,35 @@ +package org.logstash.beats; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * Configure the channel where it's installed to operate the read in pull mode, + * disabling the autoread and explicitly invoking the read operation. + * */ +@ChannelHandler.Sharable +public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRegistered(final ChannelHandlerContext ctx) throws Exception { + ctx.channel().config().setAutoRead(false); + super.channelRegistered(ctx); + } + + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } + + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + if (!ctx.channel().config().isAutoRead()) { + ctx.channel().read(); + } + } +} diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 5c4e0d25..d135a548 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -132,6 +132,7 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); + pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new BeatsParser()); pipeline.addLast(new BeatsHandler(localMessageListener)); } From ea7f14dcd5f6e772d99781a833d0f5022f2da634 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 12 Jul 2023 14:52:51 +0200 Subject: [PATCH 05/19] Update the flow control handler to avoid new reads if the channel becomes not writable, to excert backpressure to the sender system --- .../logstash/beats/FlowLimiterHandler.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/logstash/beats/FlowLimiterHandler.java b/src/main/java/org/logstash/beats/FlowLimiterHandler.java index 5ebeed04..5d88effd 100644 --- a/src/main/java/org/logstash/beats/FlowLimiterHandler.java +++ b/src/main/java/org/logstash/beats/FlowLimiterHandler.java @@ -1,16 +1,24 @@ package org.logstash.beats; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Configure the channel where it's installed to operate the read in pull mode, * disabling the autoread and explicitly invoking the read operation. + * The flow control to keep the outgoing buffer under control is done + * avoiding to read in new bytes if the outgoing direction became not writable, this + * excert back pressure to the TCP layer and ultimately to the upstream system. * */ @ChannelHandler.Sharable public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter { + private final static Logger logger = LogManager.getLogger(FlowLimiterHandler.class); + @Override public void channelRegistered(final ChannelHandlerContext ctx) throws Exception { ctx.channel().config().setAutoRead(false); @@ -20,7 +28,7 @@ public void channelRegistered(final ChannelHandlerContext ctx) throws Exception @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); - if (!ctx.channel().config().isAutoRead()) { + if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) { ctx.channel().read(); } } @@ -28,8 +36,21 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { @Override public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); - if (!ctx.channel().config().isAutoRead()) { + if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) { ctx.channel().read(); } } + + private boolean isAutoreadDisabled(Channel channel) { + return !channel.config().isAutoRead(); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + ctx.channel().read(); + super.channelWritabilityChanged(ctx); + + logger.debug("Writability on channel {} changed to {}", ctx.channel(), ctx.channel().isWritable()); + } + } From 234569d0448f9ecc621ba63bb4ccc709c340b28a Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 12 Jul 2023 16:00:54 +0200 Subject: [PATCH 06/19] Separated the logic to drop incoming connections into specific handler On new channel registration (that correspond to a new client connection), verifies the direct memory stastus to understand if almost the totality max direct memory is reached and also if the majoproity of that space is used by pinned byte buffers. If the codition is verified that means direct memory avvailable is terminating, so no new connection would help in the situation, and the incoming new connections are closed. --- .../java/org/logstash/beats/BeatsParser.java | 80 ------------------- .../logstash/beats/FlowLimiterHandler.java | 6 +- src/main/java/org/logstash/beats/Server.java | 1 + .../beats/ThunderingGuardHandler.java | 40 ++++++++++ 4 files changed, 44 insertions(+), 83 deletions(-) create mode 100644 src/main/java/org/logstash/beats/ThunderingGuardHandler.java diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 22756054..c53e0d35 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -23,7 +23,6 @@ public class BeatsParser extends ByteToMessageDecoder { private final static Logger logger = LogManager.getLogger(BeatsParser.class); - private final static long maxDirectMemory = io.netty.util.internal.PlatformDependent.maxDirectMemory(); private Batch batch; @@ -49,10 +48,6 @@ private enum States { private int requiredBytes = 0; private int sequence = 0; private boolean decodingCompressedBuffer = false; - private long usedDirectMemory; - private boolean closeCalled = false; - - private boolean stopAutoreadRequested = false; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws InvalidFrameProtocolException, IOException { @@ -62,7 +57,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } return; } - usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); switch (currentState) { case READ_HEADER: { @@ -192,12 +186,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t case READ_COMPRESSED_FRAME: { logger.trace("Running: READ_COMPRESSED_FRAME"); - if (usedDirectMemory + requiredBytes > maxDirectMemory * 0.90) { - ctx.channel().config().setAutoRead(false); - ctx.close(); - closeCalled = true; - throw new IOException("not enough memory to decompress this from " + ctx.channel().id()); - } inflateCompressedFrame(ctx, in, (buffer) -> { transition(States.READ_HEADER); @@ -208,10 +196,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } } finally { decodingCompressedBuffer = false; - ctx.channel().config().setAutoRead(false); - ctx.channel().eventLoop().schedule(() -> { - ctx.channel().config().setAutoRead(true); - }, 5, TimeUnit.MILLISECONDS); transition(States.READ_HEADER); } }); @@ -280,70 +264,6 @@ private void batchComplete() { batch = null; } - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - //System.out.println("channelRead(" + ctx.channel().isActive() + ": " + ctx.channel().id() + ":" + currentState + ":" + decodingCompressedBuffer); - if (closeCalled) { - ((ByteBuf) msg).release(); - //if(batch != null) batch.release(); - return; - } - usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory(); - - // If we're just beginning a new frame on this channel, - // don't accumulate more data for 25 ms if usage of direct memory is above 20% - // - // The goal here is to avoid thundering herd: many beats connecting and sending data - // at the same time. As some channels progress to other states they'll use more memory - // but also give it back once a full batch is read. - if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) { - if (usedDirectMemory > (maxDirectMemory * 0.40)) { - if (!stopAutoreadRequested) { - ctx.channel().config().setAutoRead(false); - stopAutoreadRequested = true; - logger.info("Set channel {} to autoread FALSE (> 40%)", ctx.channel()); - //System.out.println("pausing reads on " + ctx.channel().id()); - ctx.channel().eventLoop().schedule(() -> { - //System.out.println("resuming reads on " + ctx.channel().id()); - ctx.channel().config().setAutoRead(true); - stopAutoreadRequested = false; - logger.info("Set channel {} to autoread TRUE after 200 ms", ctx.channel()); - }, 200, TimeUnit.MILLISECONDS); - } - } else { - //System.out.println("no need to pause reads on " + ctx.channel().id()); - } - } else if (usedDirectMemory > maxDirectMemory * 0.90) { - ctx.channel().config().setAutoRead(false); - stopAutoreadRequested = true; - logger.info("Set channel {} to autoread FALSE (> 90%)", ctx.channel()); - ctx.close(); - closeCalled = true; - ((ByteBuf) msg).release(); - if (batch != null) batch.release(); - throw new IOException("about to explode, cut them all down " + ctx.channel().id()); - } - super.channelRead(ctx, msg); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage()); - if (cause instanceof DecoderException) { - ctx.channel().config().setAutoRead(false); - if (!closeCalled) ctx.close(); - } else if (cause instanceof OutOfMemoryError) { - cause.printStackTrace(); - ctx.channel().config().setAutoRead(false); - if (!closeCalled) ctx.close(); - } else if (cause instanceof IOException) { - ctx.channel().config().setAutoRead(false); - if (!closeCalled) ctx.close(); - } else { - super.exceptionCaught(ctx, cause); - } - } - @FunctionalInterface private interface CheckedConsumer { void accept(T t) throws IOException; diff --git a/src/main/java/org/logstash/beats/FlowLimiterHandler.java b/src/main/java/org/logstash/beats/FlowLimiterHandler.java index 5d88effd..6a0da517 100644 --- a/src/main/java/org/logstash/beats/FlowLimiterHandler.java +++ b/src/main/java/org/logstash/beats/FlowLimiterHandler.java @@ -1,20 +1,20 @@ package org.logstash.beats; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** - * Configure the channel where it's installed to operate the read in pull mode, + * Configure the channel where it's installed to operate the reads in pull mode, * disabling the autoread and explicitly invoking the read operation. * The flow control to keep the outgoing buffer under control is done * avoiding to read in new bytes if the outgoing direction became not writable, this * excert back pressure to the TCP layer and ultimately to the upstream system. * */ -@ChannelHandler.Sharable +@Sharable public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter { private final static Logger logger = LogManager.getLogger(FlowLimiterHandler.class); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index d135a548..c791185c 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -133,6 +133,7 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); pipeline.addLast(new FlowLimiterHandler()); + pipeline.addLast(new ThunderingGuardHandler()); pipeline.addLast(new BeatsParser()); pipeline.addLast(new BeatsHandler(localMessageListener)); } diff --git a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java new file mode 100644 index 00000000..0b63e4ab --- /dev/null +++ b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java @@ -0,0 +1,40 @@ +package org.logstash.beats; + +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * This handler is responsible to avoid accepting new connections when the direct memory + * consumption is close to the MaxDirectMemorySize. + * + * If the total allocated direct memory is close to the max memory size and also the pinned + * bytes from the direct memory allocator is close to the direct memory used, then it drops the new + * incoming connections. + * */ +@Sharable +public final class ThunderingGuardHandler extends ChannelInboundHandlerAdapter { + + private final static long MAX_DIRECT_MEMORY = io.netty.util.internal.PlatformDependent.maxDirectMemory(); + + private final static Logger logger = LogManager.getLogger(ThunderingGuardHandler.class); + + @Override + public void channelRegistered(final ChannelHandlerContext ctx) throws Exception { + PooledByteBufAllocator pooledAllocator = (PooledByteBufAllocator) ctx.alloc(); + long usedDirectMemory = pooledAllocator.metric().usedDirectMemory(); + if (usedDirectMemory > MAX_DIRECT_MEMORY * 0.90) { + long pinnedDirectMemory = pooledAllocator.pinnedDirectMemory(); + if (pinnedDirectMemory >= usedDirectMemory * 0.80) { + ctx.close(); + return; + } + } + + super.channelRegistered(ctx); + } +} From 86e4445965fc54c650ed63765ac22f656c4c0a64 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 12:25:12 +0200 Subject: [PATCH 07/19] Fist draft of the integration test --- .../beats/ThunderingGuardHandler.java | 4 +- .../beats/FlowLimiterHandlerTest.java | 247 ++++++++++++++++++ 2 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java diff --git a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java index 0b63e4ab..397d8acf 100644 --- a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java +++ b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java @@ -1,7 +1,6 @@ package org.logstash.beats; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -11,7 +10,7 @@ /** * This handler is responsible to avoid accepting new connections when the direct memory * consumption is close to the MaxDirectMemorySize. - * + *

* If the total allocated direct memory is close to the max memory size and also the pinned * bytes from the direct memory allocator is close to the direct memory used, then it drops the new * incoming connections. @@ -31,6 +30,7 @@ public void channelRegistered(final ChannelHandlerContext ctx) throws Exception long pinnedDirectMemory = pooledAllocator.pinnedDirectMemory(); if (pinnedDirectMemory >= usedDirectMemory * 0.80) { ctx.close(); + logger.info("Dropping connection {} due to high resource consumption", ctx.channel()); return; } } diff --git a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java new file mode 100644 index 00000000..c78e5b44 --- /dev/null +++ b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java @@ -0,0 +1,247 @@ +package org.logstash.beats; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.SocketChannelConfig; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class FlowLimiterHandlerTest { + + + @Test + public void testChannelInNotWriteable() { + EmbeddedChannel channel = new EmbeddedChannel(); + assertTrue(channel.isWritable()); + + makeChannelNotWriteablePassingTheOutboundHighWaterMark(channel); + + assertFalse(channel.isWritable()); + } + + private static void makeChannelNotWriteablePassingTheOutboundHighWaterMark(EmbeddedChannel channel) { + int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); + // generate payload to go over high watermark and trigger not writeable + final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); + assertEquals(writeBufferHighWaterMark, payload.readableBytes()); + assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.duplicate())); + assertTrue(channel.finish()); + assertFalse("Channel shouldn't be writeable", channel.isWritable()); + } + + @Test + public void givenAnEmptyChannelThenMessagesCanPassthrough() { + EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()); + + // write some data + final ByteBuf sample = prepareSample(16); + assertTrue(channel.writeInbound(sample.duplicate())); + assertTrue(channel.finish()); + + // read some data from the writeable channel + final ByteBuf readData = channel.readInbound(); + assertNotNull(readData); + assertEquals(sample, readData); + readData.release(); + } + + boolean writeable = true; + + @Test + public void givenANotWriteableChannelNoMessagesPassthrough() { + EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()) { +// @Override +// public boolean isWritable() { +// unsafe().outboundBuffer().remove() +// return writeable; +// } + +// @Override +// protected void doWrite(ChannelOutboundBuffer in) throws Exception { +// for (;;) { +// Object msg = in.current(); +// if (msg == null) { +// break; +// } +// } +// } + +// @Override +// public Channel flush() { +// // avoid to flush the outbound buffer to that is remains not writeable +// return this; +// } + }; + + int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); + // generate payload to go over high watermark and trigger not writeable + final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); + assertEquals(writeBufferHighWaterMark, payload.readableBytes()); +// for (int i = 0; i < 16; i++) { +// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); +// } + +// int numBuffers = 0; +// while (channel.isWritable()) { +// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); +// numBuffers ++; +// } + +// assertTrue(channel.finish()); +// channel.releaseOutbound(); +// channel.runPendingTasks(); + channel.flushOutbound(); +// writeable = false; + assertFalse("Channel shouldn't be writeable", channel.isWritable()); + + // write some data + final ByteBuf sample = prepareSample(16); + assertTrue(channel.writeInbound(sample.duplicate())); + assertTrue(channel.finish()); + + // read some data from the writeable channel + final ByteBuf readData = channel.readInbound(); + assertNull(readData); + readData.release(); + } + + private ByteBuf prepareSample(int numBytes) { + return prepareSample(numBytes, 'A'); + } + + private static ByteBuf prepareSample(int numBytes, char c) { + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(numBytes); + for (int i = 0; i < numBytes; i++) { + payload.writeByte(c); + } + return payload; + } + + + boolean anyOtherCall = false; + @Test + public void testIntegration() throws Exception { + final int highWaterMark = 32 * 1024; + FlowLimiterHandler sut = new FlowLimiterHandler(); + + NioEventLoopGroup group = new NioEventLoopGroup(); + ServerBootstrap b = new ServerBootstrap(); + try { + b.group(group) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.config().setWriteBufferHighWaterMark(highWaterMark); + ch.pipeline() + .addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + System.out.println("Client connected"); + // write as much to move the channel in not writable state + final ByteBuf payload = prepareSample(highWaterMark, 'C'); + ByteBuf copy = payload.copy(); + int readableBytes = copy.readableBytes(); + ctx.pipeline().writeAndFlush(copy).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + System.out.println("Bytes to be read: " + readableBytes + " now availables:" + copy.readableBytes()); + } + }); +// System.out.println("Bytes to be read: " + readableBytes + " now:" + copy.readableBytes()); +// payload.release(); + + System.out.println("Channel isWritable?" + ctx.channel().isWritable()); + int numBuffers = 0; + while (ctx.channel().isWritable()) { + ctx.pipeline().writeAndFlush(payload.copy()); + numBuffers ++; + } + System.out.println("Num buffers wrote to get the writable false: " + numBuffers); + System.out.println("Channel isWritable? " + ctx.channel().isWritable() + " high water mark: " + ctx.channel().config().getWriteBufferHighWaterMark()); + // ask the client to send some data present on the channel + int receiveBufferSize = ((SocketChannelConfig) ctx.channel().config()).getReceiveBufferSize(); + System.out.println("Server's receive buffer size: " + receiveBufferSize); + clientChannel.writeAndFlush(prepareSample(32)); + } + }) + .addLast(sut) + .addLast(new SimpleChannelInboundHandler() { + boolean firstChunkRead = false; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + System.out.println("Last handler Buffer read size: " + msg.readableBytes()); + if (!firstChunkRead) { + assertEquals("Expect to read a first chunk and no others", 32, msg.readableBytes()); + firstChunkRead = true; + + // client write other data that MUSTN'T be read by the server, because + // is in rate limiting. + clientChannel.writeAndFlush(prepareSample(16)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // on successful flush schedule a shutdown + ctx.channel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + group.shutdownGracefully(); + } + }, 2, TimeUnit.SECONDS); + } + }); + + } else { + // the first read happened, no other reads are commanded by the server + // should never pass here + anyOtherCall = true; + } + } + }); + } + }); + ChannelFuture future = b.bind("0.0.0.0", 1234).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + startAClient(group); + } + } + }).sync(); + future.channel().closeFuture().sync(); + } finally { + group.shutdownGracefully().sync(); + } + + assertFalse("Shouldn't never be notified other data while in rate limiting", anyOtherCall); + } + + Channel clientChannel; + + private void startAClient(NioEventLoopGroup group) { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.config().setAutoRead(false); + clientChannel = ch; + } + }); + b.connect("localhost", 1234); + } + +} \ No newline at end of file From 0148987a1ea033f09f7ecd59972fb5c90eabb27d Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 12:26:34 +0200 Subject: [PATCH 08/19] Removed tests using EmbeddedChannel because doesn't manage the writeable status due to that offload every message to the outbound list --- .../beats/FlowLimiterHandlerTest.java | 97 ------------------- 1 file changed, 97 deletions(-) diff --git a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java index c78e5b44..00f6e62b 100644 --- a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java +++ b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java @@ -19,103 +19,6 @@ public class FlowLimiterHandlerTest { - - @Test - public void testChannelInNotWriteable() { - EmbeddedChannel channel = new EmbeddedChannel(); - assertTrue(channel.isWritable()); - - makeChannelNotWriteablePassingTheOutboundHighWaterMark(channel); - - assertFalse(channel.isWritable()); - } - - private static void makeChannelNotWriteablePassingTheOutboundHighWaterMark(EmbeddedChannel channel) { - int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); - // generate payload to go over high watermark and trigger not writeable - final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); - assertEquals(writeBufferHighWaterMark, payload.readableBytes()); - assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.duplicate())); - assertTrue(channel.finish()); - assertFalse("Channel shouldn't be writeable", channel.isWritable()); - } - - @Test - public void givenAnEmptyChannelThenMessagesCanPassthrough() { - EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()); - - // write some data - final ByteBuf sample = prepareSample(16); - assertTrue(channel.writeInbound(sample.duplicate())); - assertTrue(channel.finish()); - - // read some data from the writeable channel - final ByteBuf readData = channel.readInbound(); - assertNotNull(readData); - assertEquals(sample, readData); - readData.release(); - } - - boolean writeable = true; - - @Test - public void givenANotWriteableChannelNoMessagesPassthrough() { - EmbeddedChannel channel = new EmbeddedChannel(new FlowLimiterHandler()) { -// @Override -// public boolean isWritable() { -// unsafe().outboundBuffer().remove() -// return writeable; -// } - -// @Override -// protected void doWrite(ChannelOutboundBuffer in) throws Exception { -// for (;;) { -// Object msg = in.current(); -// if (msg == null) { -// break; -// } -// } -// } - -// @Override -// public Channel flush() { -// // avoid to flush the outbound buffer to that is remains not writeable -// return this; -// } - }; - - int writeBufferHighWaterMark = channel.config().getWriteBufferHighWaterMark(); - // generate payload to go over high watermark and trigger not writeable - final ByteBuf payload = prepareSample(writeBufferHighWaterMark, 'C'); - assertEquals(writeBufferHighWaterMark, payload.readableBytes()); -// for (int i = 0; i < 16; i++) { -// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); -// } - -// int numBuffers = 0; -// while (channel.isWritable()) { -// assertTrue("data was added to outbound buffer", channel.writeOutbound(payload.copy())); -// numBuffers ++; -// } - -// assertTrue(channel.finish()); -// channel.releaseOutbound(); -// channel.runPendingTasks(); - channel.flushOutbound(); -// writeable = false; - assertFalse("Channel shouldn't be writeable", channel.isWritable()); - - // write some data - final ByteBuf sample = prepareSample(16); - assertTrue(channel.writeInbound(sample.duplicate())); - assertTrue(channel.finish()); - - // read some data from the writeable channel - final ByteBuf readData = channel.readInbound(); - assertNull(readData); - readData.release(); - } - private ByteBuf prepareSample(int numBytes) { return prepareSample(numBytes, 'A'); } From af0473315e457f11ec09c18b165ef42d740610b9 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 15:38:08 +0200 Subject: [PATCH 09/19] Reshaped the asynch code to be more linear --- .../beats/FlowLimiterHandlerTest.java | 187 +++++++++++------- 1 file changed, 116 insertions(+), 71 deletions(-) diff --git a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java index 00f6e62b..f268621e 100644 --- a/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java +++ b/src/test/java/org/logstash/beats/FlowLimiterHandlerTest.java @@ -4,22 +4,29 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.*; -import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.junit.Test; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import static org.junit.Assert.*; public class FlowLimiterHandlerTest { - private ByteBuf prepareSample(int numBytes) { + private ReadMessagesCollector readMessagesCollector; + + private static ByteBuf prepareSample(int numBytes) { return prepareSample(numBytes, 'A'); } @@ -31,15 +38,101 @@ private static ByteBuf prepareSample(int numBytes, char c) { return payload; } + private ChannelInboundHandlerAdapter onClientConnected(Consumer action) { + return new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + action.accept(ctx); + } + }; + } + + private static class ReadMessagesCollector extends SimpleChannelInboundHandler { + private Channel clientChannel; + private final NioEventLoopGroup group; + boolean firstChunkRead = false; + + ReadMessagesCollector(NioEventLoopGroup group) { + this.group = group; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + if (!firstChunkRead) { + assertEquals("Expect to read a first chunk and no others", 32, msg.readableBytes()); + firstChunkRead = true; + + // client write other data that MUSTN'T be read by the server, because + // is rate limited. + clientChannel.writeAndFlush(prepareSample(16)).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // on successful flush schedule a shutdown + ctx.channel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + group.shutdownGracefully(); + } + }, 2, TimeUnit.SECONDS); + } else { + ctx.fireExceptionCaught(future.cause()); + } + } + }); + + } else { + // the first read happened, no other reads are commanded by the server + // should never pass here + fail("Shouldn't never be notified other data while in rate limiting"); + } + } + + public void updateClient(Channel clientChannel) { + assertNotNull(clientChannel); + this.clientChannel = clientChannel; + } + } + + + private static class AssertionsHandler extends ChannelInboundHandlerAdapter { + + private final NioEventLoopGroup group; + + private Throwable lastError; + + public AssertionsHandler(NioEventLoopGroup group) { + this.group = group; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + lastError = cause; + group.shutdownGracefully(); + } + + public void assertNoErrors() { + if (lastError != null) { + if (lastError instanceof AssertionError) { + throw (AssertionError) lastError; + } else { + fail("Failed with error" + lastError); + } + } + } + } - boolean anyOtherCall = false; @Test - public void testIntegration() throws Exception { + public void givenAChannelInNotWriteableStateWhenNewBuffersAreSentByClientThenNoDecodeTakePartOnServerSide() throws Exception { final int highWaterMark = 32 * 1024; FlowLimiterHandler sut = new FlowLimiterHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); + + readMessagesCollector = new ReadMessagesCollector(group); + AssertionsHandler assertionsHandler = new AssertionsHandler(group); try { b.group(group) .channel(NioServerSocketChannel.class) @@ -48,71 +141,15 @@ public void testIntegration() throws Exception { protected void initChannel(SocketChannel ch) throws Exception { ch.config().setWriteBufferHighWaterMark(highWaterMark); ch.pipeline() - .addLast(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - System.out.println("Client connected"); - // write as much to move the channel in not writable state - final ByteBuf payload = prepareSample(highWaterMark, 'C'); - ByteBuf copy = payload.copy(); - int readableBytes = copy.readableBytes(); - ctx.pipeline().writeAndFlush(copy).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - System.out.println("Bytes to be read: " + readableBytes + " now availables:" + copy.readableBytes()); - } - }); -// System.out.println("Bytes to be read: " + readableBytes + " now:" + copy.readableBytes()); -// payload.release(); - - System.out.println("Channel isWritable?" + ctx.channel().isWritable()); - int numBuffers = 0; - while (ctx.channel().isWritable()) { - ctx.pipeline().writeAndFlush(payload.copy()); - numBuffers ++; - } - System.out.println("Num buffers wrote to get the writable false: " + numBuffers); - System.out.println("Channel isWritable? " + ctx.channel().isWritable() + " high water mark: " + ctx.channel().config().getWriteBufferHighWaterMark()); - // ask the client to send some data present on the channel - int receiveBufferSize = ((SocketChannelConfig) ctx.channel().config()).getReceiveBufferSize(); - System.out.println("Server's receive buffer size: " + receiveBufferSize); - clientChannel.writeAndFlush(prepareSample(32)); - } - }) + .addLast(onClientConnected(ctx -> { + // write as much to move the channel in not writable state + fillOutboundWatermark(ctx, highWaterMark); + // ask the client to send some data present on the channel + clientChannel.writeAndFlush(prepareSample(32)); + })) .addLast(sut) - .addLast(new SimpleChannelInboundHandler() { - boolean firstChunkRead = false; - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - System.out.println("Last handler Buffer read size: " + msg.readableBytes()); - if (!firstChunkRead) { - assertEquals("Expect to read a first chunk and no others", 32, msg.readableBytes()); - firstChunkRead = true; - - // client write other data that MUSTN'T be read by the server, because - // is in rate limiting. - clientChannel.writeAndFlush(prepareSample(16)).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // on successful flush schedule a shutdown - ctx.channel().eventLoop().schedule(new Runnable() { - @Override - public void run() { - group.shutdownGracefully(); - } - }, 2, TimeUnit.SECONDS); - } - }); - - } else { - // the first read happened, no other reads are commanded by the server - // should never pass here - anyOtherCall = true; - } - } - }); + .addLast(readMessagesCollector) + .addLast(assertionsHandler); } }); ChannelFuture future = b.bind("0.0.0.0", 1234).addListener(new ChannelFutureListener() { @@ -128,7 +165,14 @@ public void operationComplete(ChannelFuture future) throws Exception { group.shutdownGracefully().sync(); } - assertFalse("Shouldn't never be notified other data while in rate limiting", anyOtherCall); + assertionsHandler.assertNoErrors(); + } + + private static void fillOutboundWatermark(ChannelHandlerContext ctx, int highWaterMark) { + final ByteBuf payload = prepareSample(highWaterMark, 'C'); + while (ctx.channel().isWritable()) { + ctx.pipeline().writeAndFlush(payload.copy()); + } } Channel clientChannel; @@ -142,6 +186,7 @@ private void startAClient(NioEventLoopGroup group) { protected void initChannel(SocketChannel ch) throws Exception { ch.config().setAutoRead(false); clientChannel = ch; + readMessagesCollector.updateClient(clientChannel); } }); b.connect("localhost", 1234); From cd7aafe2070a175ffc4dc28b9270dd1ee5da948e Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 17 Jul 2023 18:29:49 +0200 Subject: [PATCH 10/19] Covered the number of connection limiter with unit test --- .../beats/ThunderingGuardHandlerTest.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java diff --git a/src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java b/src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java new file mode 100644 index 00000000..d93f6582 --- /dev/null +++ b/src/test/java/org/logstash/beats/ThunderingGuardHandlerTest.java @@ -0,0 +1,83 @@ +package org.logstash.beats; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.ReferenceCounted; +import io.netty.util.internal.PlatformDependent; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +public class ThunderingGuardHandlerTest { + + public static final int MB = 1024 * 1024; + public static final long MAX_DIRECT_MEMORY_BYTES = PlatformDependent.maxDirectMemory(); + + @Test + public void testVerifyDirectMemoryCouldGoBeyondThe90Percent() { + // allocate 90% of direct memory + List allocatedBuffers = allocateDirectMemory(MAX_DIRECT_MEMORY_BYTES, 0.9); + + // allocate one more + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB); + long usedDirectMemory = PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory(); + long pinnedDirectMemory = PooledByteBufAllocator.DEFAULT.pinnedDirectMemory(); + + // verify + assertTrue("Direct memory allocation should be > 90% of the max available", usedDirectMemory > 0.9 * MAX_DIRECT_MEMORY_BYTES); + assertTrue("Direct memory usage should be > 80% of the max available", pinnedDirectMemory > 0.8 * MAX_DIRECT_MEMORY_BYTES); + + allocatedBuffers.forEach(ReferenceCounted::release); + payload.release(); + } + + private static List allocateDirectMemory(long maxDirectMemoryBytes, double percentage) { + List allocatedBuffers = new ArrayList<>(); + final long numBuffersToAllocate = (long) (maxDirectMemoryBytes / MB * percentage); + for (int i = 0; i < numBuffersToAllocate; i++) { + allocatedBuffers.add(PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB)); + } + return allocatedBuffers; + } + + @Test + public void givenUsedDirectMemoryAndPinnedMemoryAreCloseToTheMaxDirectAvailableWhenNewConnectionIsCreatedThenItIsReject() { + EmbeddedChannel channel = new EmbeddedChannel(new ThunderingGuardHandler()); + + // consume > 90% of the direct memory + List allocatedBuffers = allocateDirectMemory(MAX_DIRECT_MEMORY_BYTES, 0.9); + // allocate one more + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB); + + channel.pipeline().fireChannelRegistered(); + + // verify + assertFalse("Under constrained memory new channels has to be forcibly closed", channel.isOpen()); + + allocatedBuffers.forEach(ReferenceCounted::release); + payload.release(); + } + + @Test + public void givenUsedDirectMemoryAndNotPinnedWhenNewConnectionIsCreatedThenItIsAccepted() { + EmbeddedChannel channel = new EmbeddedChannel(new ThunderingGuardHandler()); + + // consume > 90% of the direct memory + List allocatedBuffers = allocateDirectMemory(MAX_DIRECT_MEMORY_BYTES, 0.9); + allocatedBuffers.forEach(ReferenceCounted::release); + // allocate one more + ByteBuf payload = PooledByteBufAllocator.DEFAULT.directBuffer(1 * MB); + payload.release(); + + channel.pipeline().fireChannelRegistered(); + + // verify + assertTrue("Despite memory is allocated but not pinned, new connections MUST be accepted", channel.isOpen()); + + } + +} \ No newline at end of file From c6e775cf0323d0066b6b7de3e81fdab8f5de042d Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 20 Jul 2023 16:54:18 +0200 Subject: [PATCH 11/19] Pessimistic remediation, when a direct OOM happens close the channel --- .../java/org/logstash/beats/BeatsParser.java | 12 +++- .../logstash/beats/OOMConnectionCloser.java | 60 +++++++++++++++++++ src/main/java/org/logstash/beats/Server.java | 3 + 3 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/logstash/beats/OOMConnectionCloser.java diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index c53e0d35..e8adef60 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -2,6 +2,7 @@ import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; @@ -203,7 +204,16 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } case READ_JSON: { logger.trace("Running: READ_JSON"); - ((V2Batch) batch).addMessage(sequence, in, requiredBytes); + try { + ((V2Batch) batch).addMessage(sequence, in, requiredBytes); + } catch (Throwable th) { + // batch has to release its internal buffer before bubbling up the exception + batch.release(); + + // re throw the same error after released the internal buffer + throw th; + } + if (batch.isComplete()) { if (logger.isTraceEnabled()) { logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java new file mode 100644 index 00000000..5ba3effb --- /dev/null +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -0,0 +1,60 @@ +package org.logstash.beats; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class OOMConnectionCloser extends ChannelInboundHandlerAdapter { + + private static class DirectMemoryUsage { + private final long used; + private final long pinned; + private final short ratio; + + private DirectMemoryUsage(long used, long pinned) { + this.used = used; + this.pinned = pinned; + this.ratio = (short) Math.round(((double) pinned / used) * 100); + } + + static DirectMemoryUsage capture() { + PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + long usedDirectMemory = allocator.metric().usedDirectMemory(); + long pinnedDirectMemory = allocator.pinnedDirectMemory(); + return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory); + } + } + + private final static Logger logger = LogManager.getLogger(OOMConnectionCloser.class); + + public static final Pattern DIRECT_MEMORY_ERROR = Pattern.compile("^Cannot reserve \\d* bytes of direct buffer memory.*$"); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (isDirectMemoryOOM(cause)) { + DirectMemoryUsage direct = DirectMemoryUsage.capture(); + logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); + logger.warn("Dropping connection {} because run out of direct memory. To fix it, in Filebeat you can" + + "enable slow_start, reduce number of workers, reduce the bulk_max_size or even raise up the -XX:MaxDirectMemorySize " + + "option in the JVM running Logstash", ctx.channel()); + ctx.flush(); + ctx.close(); + } else { + super.exceptionCaught(ctx, cause); + } + } + + private boolean isDirectMemoryOOM(Throwable th) { + if (!(th instanceof OutOfMemoryError)) { + return false; + } + Matcher m = DIRECT_MEMORY_ERROR.matcher(th.getMessage()); + return m.matches(); + } +} \ No newline at end of file diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index c791185c..e456640d 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -1,6 +1,8 @@ package org.logstash.beats; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -135,6 +137,7 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new ThunderingGuardHandler()); pipeline.addLast(new BeatsParser()); + pipeline.addLast(new OOMConnectionCloser()); pipeline.addLast(new BeatsHandler(localMessageListener)); } From e34bf25d3d865eb32ad59d54c7c2aaa4f6efd1ab Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 28 Jul 2023 10:50:38 +0200 Subject: [PATCH 12/19] Removed from the log string any reference to Filebeat --- src/main/java/org/logstash/beats/OOMConnectionCloser.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java index 5ba3effb..7336b666 100644 --- a/src/main/java/org/logstash/beats/OOMConnectionCloser.java +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -40,9 +40,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (isDirectMemoryOOM(cause)) { DirectMemoryUsage direct = DirectMemoryUsage.capture(); logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); - logger.warn("Dropping connection {} because run out of direct memory. To fix it, in Filebeat you can" + - "enable slow_start, reduce number of workers, reduce the bulk_max_size or even raise up the -XX:MaxDirectMemorySize " + - "option in the JVM running Logstash", ctx.channel()); + logger.warn("Dropping connection {} because run out of direct memory. To fix it, check if the upstream source " + + "has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. " + + "Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); ctx.flush(); ctx.close(); } else { From 7a6982e1541cfc09816c827af5829f9968293c46 Mon Sep 17 00:00:00 2001 From: andsel Date: Fri, 28 Jul 2023 11:05:21 +0200 Subject: [PATCH 13/19] Raised up the log level level when dropping connections becuase of thundering and going to OOM condition --- src/main/java/org/logstash/beats/ThunderingGuardHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java index 397d8acf..a880ed35 100644 --- a/src/main/java/org/logstash/beats/ThunderingGuardHandler.java +++ b/src/main/java/org/logstash/beats/ThunderingGuardHandler.java @@ -30,7 +30,7 @@ public void channelRegistered(final ChannelHandlerContext ctx) throws Exception long pinnedDirectMemory = pooledAllocator.pinnedDirectMemory(); if (pinnedDirectMemory >= usedDirectMemory * 0.80) { ctx.close(); - logger.info("Dropping connection {} due to high resource consumption", ctx.channel()); + logger.warn("Dropping connection {} due to high resource consumption", ctx.channel()); return; } } From 299ee27cb33060cd681ebd9e68dcc332f4303acc Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Fri, 28 Jul 2023 13:48:33 +0200 Subject: [PATCH 14/19] Better actionable suggestion to user in case of OOM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- src/main/java/org/logstash/beats/OOMConnectionCloser.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java index 7336b666..57a69802 100644 --- a/src/main/java/org/logstash/beats/OOMConnectionCloser.java +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -40,9 +40,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E if (isDirectMemoryOOM(cause)) { DirectMemoryUsage direct = DirectMemoryUsage.capture(); logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); - logger.warn("Dropping connection {} because run out of direct memory. To fix it, check if the upstream source " + - "has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. " + - "Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); + logger.warn("Dropping connection {} due to lack of available Direct Memory. Please lower the number of concurrent connections or reduce the batch size. " + + "Alternatively, raise -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); ctx.flush(); ctx.close(); } else { From e24f33951c75c684a540cc49329805153ef7bca5 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 20 Sep 2023 14:06:53 +0200 Subject: [PATCH 15/19] Updated OOMConnectionCloser to monitor the consumption of memory also during the read opeartion and not only on exception --- .../java/org/logstash/beats/BeatsHandler.java | 5 ++ .../logstash/beats/OOMConnectionCloser.java | 47 +++++++++++++++---- src/main/java/org/logstash/beats/Server.java | 2 +- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index 15dfb7e9..caa5054a 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -1,5 +1,7 @@ package org.logstash.beats; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.logging.log4j.LogManager; @@ -92,6 +94,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E logger.info(format("closing (" + cause.getMessage() + ")")); } } else { + PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + OOMConnectionCloser.DirectMemoryUsage usageSnapshot = OOMConnectionCloser.DirectMemoryUsage.capture(allocator); + logger.info("Connection {}, memory status used: {}, pinned: {}, ratio {}", ctx.channel(), usageSnapshot.used, usageSnapshot.pinned, usageSnapshot.ratio); final Throwable realCause = extractCause(cause, 0); if (logger.isDebugEnabled()){ logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause); diff --git a/src/main/java/org/logstash/beats/OOMConnectionCloser.java b/src/main/java/org/logstash/beats/OOMConnectionCloser.java index 57a69802..751aba53 100644 --- a/src/main/java/org/logstash/beats/OOMConnectionCloser.java +++ b/src/main/java/org/logstash/beats/OOMConnectionCloser.java @@ -4,6 +4,8 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.PlatformDependent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,22 +14,31 @@ public class OOMConnectionCloser extends ChannelInboundHandlerAdapter { - private static class DirectMemoryUsage { - private final long used; - private final long pinned; - private final short ratio; + private final PooledByteBufAllocator allocator; - private DirectMemoryUsage(long used, long pinned) { + static class DirectMemoryUsage { + final long used; + final long pinned; + private final PooledByteBufAllocator allocator; + final short ratio; + + private DirectMemoryUsage(long used, long pinned, PooledByteBufAllocator allocator) { this.used = used; this.pinned = pinned; + this.allocator = allocator; this.ratio = (short) Math.round(((double) pinned / used) * 100); } - static DirectMemoryUsage capture() { - PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + static DirectMemoryUsage capture(PooledByteBufAllocator allocator) { long usedDirectMemory = allocator.metric().usedDirectMemory(); long pinnedDirectMemory = allocator.pinnedDirectMemory(); - return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory); + return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory, allocator); + } + + boolean isCloseToOOM() { + long maxDirectMemory = PlatformDependent.maxDirectMemory(); + int chunkSize = allocator.metric().chunkSize(); + return ((maxDirectMemory - used) <= chunkSize) && ratio > 75; } } @@ -35,10 +46,28 @@ static DirectMemoryUsage capture() { public static final Pattern DIRECT_MEMORY_ERROR = Pattern.compile("^Cannot reserve \\d* bytes of direct buffer memory.*$"); + OOMConnectionCloser() { + allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator); + logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); + if (direct.isCloseToOOM()) { + logger.warn("Closing connection {} because running out of memory, used: {}, pinned: {}, ratio {}", ctx.channel(), direct.used, direct.pinned, direct.ratio); + ReferenceCountUtil.release(msg); // to free the memory used by the buffer + ctx.flush(); + ctx.close(); + } else { + super.channelRead(ctx, msg); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (isDirectMemoryOOM(cause)) { - DirectMemoryUsage direct = DirectMemoryUsage.capture(); + DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator); logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio); logger.warn("Dropping connection {} due to lack of available Direct Memory. Please lower the number of concurrent connections or reduce the batch size. " + "Alternatively, raise -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel()); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index e456640d..90bc733b 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -126,6 +126,7 @@ private class BeatsInitializer extends ChannelInitializer { public void initChannel(SocketChannel socket){ ChannelPipeline pipeline = socket.pipeline(); + pipeline.addLast(new OOMConnectionCloser()); if (isSslEnabled()) { pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); @@ -137,7 +138,6 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new ThunderingGuardHandler()); pipeline.addLast(new BeatsParser()); - pipeline.addLast(new OOMConnectionCloser()); pipeline.addLast(new BeatsHandler(localMessageListener)); } From c7c54d9b83e28ddc29155eb7a7243bd3ce68ec72 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 20 Sep 2023 14:32:27 +0200 Subject: [PATCH 16/19] Re-introduce the beats handlers worker group to separata the Beats protocol processing from the boss group that accepts and listed to new sockets --- src/main/java/org/logstash/beats/Server.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 90bc733b..1254ed9e 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -1,9 +1,11 @@ package org.logstash.beats; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -114,6 +116,7 @@ private class BeatsInitializer extends ChannelInitializer { private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5; private final EventExecutorGroup idleExecutorGroup; + private final EventExecutorGroup beatsHandlerExecutorGroup; private final IMessageListener localMessageListener; private final int localClientInactivityTimeoutSeconds; @@ -122,6 +125,7 @@ private class BeatsInitializer extends ChannelInitializer { this.localMessageListener = messageListener; this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); + beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); } public void initChannel(SocketChannel socket){ @@ -137,8 +141,8 @@ public void initChannel(SocketChannel socket){ pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); pipeline.addLast(new FlowLimiterHandler()); pipeline.addLast(new ThunderingGuardHandler()); - pipeline.addLast(new BeatsParser()); - pipeline.addLast(new BeatsHandler(localMessageListener)); + pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser()); + pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener)); } From 16a76b8090f87a5e7c2336b80c52c12f36f3b56b Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 20 Sep 2023 15:35:52 +0200 Subject: [PATCH 17/19] Added feature flag named protect_direct_memory to control the usage of OOM checking or not. Enabled by default. --- docs/index.asciidoc | 12 ++++++++++++ lib/logstash/inputs/beats.rb | 6 +++++- spec/inputs/beats_spec.rb | 7 ++++--- src/main/java/org/logstash/beats/Runner.java | 2 +- src/main/java/org/logstash/beats/Server.java | 18 ++++++++++++++---- .../java/org/logstash/beats/ServerTest.java | 6 +++--- 6 files changed, 39 insertions(+), 12 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index d1721ed5..b6647751 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -221,6 +221,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|__Deprecated__ | <> |<>|Yes +| <> |<>|No | <> |<>|__Deprecated__ | <> |a valid filesystem path|No | <> |<>|No @@ -384,6 +385,17 @@ deprecated[6.5.0, Replaced by <>] The port to listen on. +[id="plugins-{type}s-{plugin}-protect_direct_memory"] +===== `protect_direct_memory` + + * Value type is <> + * Default value is `true` + +If enabled, actively check native memory used by network part to do parsing and avoid +out of memory conditions. When the consumption of native memory used is close to +the maximum limit, connections are being closed in undetermined order until the safe +memory condition is reestablished. + [id="plugins-{type}s-{plugin}-ssl"] ===== `ssl` deprecated[6.6.0, Replaced by <>] diff --git a/lib/logstash/inputs/beats.rb b/lib/logstash/inputs/beats.rb index 3a302bfd..0d59fad0 100644 --- a/lib/logstash/inputs/beats.rb +++ b/lib/logstash/inputs/beats.rb @@ -74,6 +74,10 @@ class LogStash::Inputs::Beats < LogStash::Inputs::Base # The port to listen on. config :port, :validate => :number, :required => true + # Proactive checks that keep the beats input active when the memory used by protocol parser and network + # related operations is going to terminate. + config :protect_direct_memory, :validate => :boolean, :default => true + # Events are by default sent in plain text. You can # enable encryption by setting `ssl` to true and configuring # the `ssl_certificate` and `ssl_key` options. @@ -243,7 +247,7 @@ def register end # def register def create_server - server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads) + server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, @protect_direct_memory) server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled server end diff --git a/spec/inputs/beats_spec.rb b/spec/inputs/beats_spec.rb index f68a33da..7c0694da 100644 --- a/spec/inputs/beats_spec.rb +++ b/spec/inputs/beats_spec.rb @@ -14,6 +14,7 @@ let(:port) { BeatsInputTest.random_port } let(:client_inactivity_timeout) { 400 } let(:threads) { 1 + rand(9) } + let(:protect_direct_memory) { true } let(:queue) { Queue.new } let(:config) do { @@ -36,7 +37,7 @@ let(:port) { 9001 } it "sends the required options to the server" do - expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads) + expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory) subject.register end end @@ -529,8 +530,8 @@ subject(:plugin) { LogStash::Inputs::Beats.new(config) } before do - @server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads) - expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads).and_return @server + @server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads, protect_direct_memory) + expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory).and_return @server expect( @server ).to receive(:listen) subject.register diff --git a/src/main/java/org/logstash/beats/Runner.java b/src/main/java/org/logstash/beats/Runner.java index 0cb623e4..548f6ef1 100644 --- a/src/main/java/org/logstash/beats/Runner.java +++ b/src/main/java/org/logstash/beats/Runner.java @@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception { // Check for leaks. // ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); - Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors()); + Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), true); if(args.length > 0 && args[0].equals("ssl")) { logger.debug("Using SSL"); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 1254ed9e..983d80a9 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -22,6 +22,7 @@ public class Server { private final int port; private final String host; private final int beatsHeandlerThreadCount; + private final boolean protectDirectMemory; private NioEventLoopGroup workGroup; private IMessageListener messageListener = new MessageListener(); private SslHandlerProvider sslHandlerProvider; @@ -29,11 +30,16 @@ public class Server { private final int clientInactivityTimeoutSeconds; - public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) { +// public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) { +// this(host, p, clientInactivityTimeoutSeconds, threadCount, true); +// } + + public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount, boolean protectDirectMemory) { this.host = host; port = p; this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; beatsHeandlerThreadCount = threadCount; + this.protectDirectMemory = protectDirectMemory; } public void setSslHandlerProvider(SslHandlerProvider sslHandlerProvider){ @@ -130,7 +136,9 @@ private class BeatsInitializer extends ChannelInitializer { public void initChannel(SocketChannel socket){ ChannelPipeline pipeline = socket.pipeline(); - pipeline.addLast(new OOMConnectionCloser()); + if (protectDirectMemory) { + pipeline.addLast(new OOMConnectionCloser()); + } if (isSslEnabled()) { pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket)); @@ -139,8 +147,10 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); - pipeline.addLast(new FlowLimiterHandler()); - pipeline.addLast(new ThunderingGuardHandler()); + if (protectDirectMemory) { + pipeline.addLast(new FlowLimiterHandler()); + pipeline.addLast(new ThunderingGuardHandler()); + } pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser()); pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener)); } diff --git a/src/test/java/org/logstash/beats/ServerTest.java b/src/test/java/org/logstash/beats/ServerTest.java index 37512cdc..b067e55d 100644 --- a/src/test/java/org/logstash/beats/ServerTest.java +++ b/src/test/java/org/logstash/beats/ServerTest.java @@ -50,7 +50,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte final CountDownLatch latch = new CountDownLatch(concurrentConnections); - final Server server = new Server(host, randomPort, inactivityTime, threadCount); + final Server server = new Server(host, randomPort, inactivityTime, threadCount, true); final AtomicBoolean otherCause = new AtomicBoolean(false); server.setMessageListener(new MessageListener() { public void onNewConnection(ChannelHandlerContext ctx) { @@ -114,7 +114,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt final CountDownLatch latch = new CountDownLatch(concurrentConnections); final AtomicBoolean exceptionClose = new AtomicBoolean(false); - final Server server = new Server(host, randomPort, inactivityTime, threadCount); + final Server server = new Server(host, randomPort, inactivityTime, threadCount, true); server.setMessageListener(new MessageListener() { @Override public void onNewConnection(ChannelHandlerContext ctx) { @@ -170,7 +170,7 @@ public void run() { @Test public void testServerShouldAcceptConcurrentConnection() throws InterruptedException { - final Server server = new Server(host, randomPort, 30, threadCount); + final Server server = new Server(host, randomPort, 30, threadCount, true); SpyListener listener = new SpyListener(); server.setMessageListener(listener); Runnable serverTask = new Runnable() { From d75a0df3013c9612c967b591debf5adc4bb607f6 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 20 Sep 2023 17:58:24 +0200 Subject: [PATCH 18/19] Throw a configuration error if Netty reserved direct memory is not anough (256MB) --- lib/logstash/inputs/beats.rb | 2 ++ src/main/java/org/logstash/beats/Server.java | 19 +++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/beats.rb b/lib/logstash/inputs/beats.rb index 0d59fad0..d1f8a6c5 100644 --- a/lib/logstash/inputs/beats.rb +++ b/lib/logstash/inputs/beats.rb @@ -250,6 +250,8 @@ def create_server server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, @protect_direct_memory) server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled server + rescue java.lang.IllegalArgumentException => e + configuration_error e.message end def run(output_queue) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 983d80a9..162216de 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -12,6 +12,7 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.internal.PlatformDependent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.logstash.netty.SslHandlerProvider; @@ -30,16 +31,26 @@ public class Server { private final int clientInactivityTimeoutSeconds; -// public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) { -// this(host, p, clientInactivityTimeoutSeconds, threadCount, true); -// } - public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount, boolean protectDirectMemory) { this.host = host; port = p; this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; beatsHeandlerThreadCount = threadCount; this.protectDirectMemory = protectDirectMemory; + + validateMinimumDirectMemory(); + } + + /** + * Validate if the configured available direct memory is enough for safe processing, else throws a ConfigurationException + * */ + private void validateMinimumDirectMemory() { + long maxDirectMemoryAllocatable = PlatformDependent.maxDirectMemory(); + if (maxDirectMemoryAllocatable < 256 * 1024 * 1024) { + long roundedMegabytes = Math.round((double) maxDirectMemoryAllocatable / 1024 / 1024); + throw new IllegalArgumentException("Max direct memory should be at least 256MB but was " + roundedMegabytes + "MB, " + + "please check your MaxDirectMemorySize and io.netty.maxDirectMemory settings"); + } } public void setSslHandlerProvider(SslHandlerProvider sslHandlerProvider){ From 68f49678015cffb567af07f2f95cfeec1c171d22 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 27 Sep 2023 10:30:25 +0200 Subject: [PATCH 19/19] Add missed shutdown of beat's worker loop --- src/main/java/org/logstash/beats/Server.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 162216de..b15741b6 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -181,6 +181,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E public void shutdownEventExecutor() { try { idleExecutorGroup.shutdownGracefully().sync(); + beatsHandlerExecutorGroup.shutdownGracefully().sync(); } catch (InterruptedException e) { throw new IllegalStateException(e); }