From 3ec6b25220c8185a4e4d6a65a79b2556c6e4b363 Mon Sep 17 00:00:00 2001 From: sahusanket Date: Mon, 11 Nov 2024 20:50:04 +0530 Subject: [PATCH] CDAP-21071 : Adding backpressure while for HttpRequestRouter using channelWritabilityChanged --- .../router/handlers/HttpRequestRouter.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/handlers/HttpRequestRouter.java b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/handlers/HttpRequestRouter.java index 3fc932fc2f74..5c0ca922d71e 100644 --- a/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/handlers/HttpRequestRouter.java +++ b/cdap-gateway/src/main/java/io/cdap/cdap/gateway/router/handlers/HttpRequestRouter.java @@ -192,6 +192,30 @@ public void channelInactive(ChannelHandlerContext ctx) { ctx.fireChannelInactive(); } + /** + * [CDAP-21071] Handles the case by stopping the outbound channel of Service -> Router + * when the response from Service -> Router is faster than the response from Router to the Client. + */ + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + if (inflightRequests > 0 && currentMessageSender.outboundChannel != null) { + final Channel outboundChannel = ctx.channel(); + ctx.executor().execute(() -> { + // If outboundChannel is not saturated anymore, continue accepting + // the incoming traffic from the outbound channel for service<>router. + if (outboundChannel.isWritable()) { + LOG.trace("Setting message sender's outboundChannel readable."); + currentMessageSender.outboundChannel.config().setAutoRead(true); + } else { + // If outboundChannel is saturated, do not read inboundChannel + LOG.trace("Setting message sender's outboundChannel non-readable."); + currentMessageSender.outboundChannel.config().setAutoRead(false); + } + }); + } + ctx.fireChannelWritabilityChanged(); + } + private ChannelFutureListener getFailureResponseListener(final Channel inboundChannel) { if (failureResponseListener == null) { failureResponseListener = new ChannelFutureListener() {