diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index 7c99d589d72..20c3946b5ff 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -144,6 +144,12 @@ static int in_fw_collect(struct flb_input_instance *ins, return -1; } + if(ctx->is_paused) { + flb_downstream_conn_release(connection); + flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd); + return -1; + } + flb_plg_trace(ins, "new TCP connection arrived FD=%i", connection->fd); conn = fw_conn_add(connection, ctx); @@ -265,6 +271,9 @@ static int in_fw_init(struct flb_input_instance *ins, /* Set the context */ flb_input_set_context(ins, ctx); + /* Set plugin ingestion to active */ + ctx->is_paused = FLB_FALSE; + /* Unix Socket mode */ if (ctx->unix_path) { #ifndef FLB_HAVE_UNIX_SOCKET @@ -348,6 +357,17 @@ static int in_fw_init(struct flb_input_instance *ins, static void in_fw_pause(void *data, struct flb_config *config) { struct flb_in_fw_config *ctx = data; + if (config->is_running == FLB_TRUE) { + /* + * This is the case when we are not in a shutdown phase, but + * backpressure built up, and the plugin needs to + * pause the ingestion. The plugin should close all the connections + * and wait for the ingestion to resume. + */ + flb_input_collector_pause(ctx->coll_fd, ctx->ins); + fw_conn_del_all(ctx); + ctx->is_paused = FLB_TRUE; + } /* * If the plugin is paused AND the ingestion not longer active, @@ -362,6 +382,15 @@ static void in_fw_pause(void *data, struct flb_config *config) } } +static void in_fw_resume(void *data, struct flb_config *config) { + struct flb_in_fw_config *ctx = data; + if (config->is_running == FLB_TRUE) { + ctx->is_paused = FLB_FALSE; + flb_input_collector_resume(ctx->coll_fd, ctx->ins); + } +} + + static int in_fw_exit(void *data, struct flb_config *config) { (void) *config; @@ -431,6 +460,7 @@ struct flb_input_plugin in_forward_plugin = { .cb_collect = in_fw_collect, .cb_flush_buf = NULL, .cb_pause = in_fw_pause, + .cb_resume = in_fw_resume, .cb_exit = in_fw_exit, .config_map = config_map, .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 70c5de57775..4cd270e27a2 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -71,6 +71,9 @@ struct flb_in_fw_config { struct flb_log_event_decoder *log_decoder; struct flb_log_event_encoder *log_encoder; + + /* Plugin is paused */ + int is_paused; }; #endif