Skip to content

Commit

Permalink
in_forward: Add plugin pause/resume mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Treu <[email protected]>
  • Loading branch information
drbugfinder-work authored and edsiper committed Jul 12, 2024
1 parent 88545a0 commit a6feacd
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
30 changes: 30 additions & 0 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ static int in_fw_collect(struct flb_input_instance *ins,
return -1;
}

if(ctx->is_paused) {
flb_downstream_conn_release(connection);
flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd);
return -1;
}

flb_plg_trace(ins, "new TCP connection arrived FD=%i", connection->fd);

conn = fw_conn_add(connection, ctx);
Expand Down Expand Up @@ -265,6 +271,9 @@ static int in_fw_init(struct flb_input_instance *ins,
/* Set the context */
flb_input_set_context(ins, ctx);

/* Set plugin ingestion to active */
ctx->is_paused = FLB_FALSE;

/* Unix Socket mode */
if (ctx->unix_path) {
#ifndef FLB_HAVE_UNIX_SOCKET
Expand Down Expand Up @@ -348,6 +357,17 @@ static int in_fw_init(struct flb_input_instance *ins,
static void in_fw_pause(void *data, struct flb_config *config)
{
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
/*
* This is the case when we are not in a shutdown phase, but
* backpressure built up, and the plugin needs to
* pause the ingestion. The plugin should close all the connections
* and wait for the ingestion to resume.
*/
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
fw_conn_del_all(ctx);
ctx->is_paused = FLB_TRUE;
}

/*
* If the plugin is paused AND the ingestion not longer active,
Expand All @@ -362,6 +382,15 @@ static void in_fw_pause(void *data, struct flb_config *config)
}
}

static void in_fw_resume(void *data, struct flb_config *config) {
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
ctx->is_paused = FLB_FALSE;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}
}


static int in_fw_exit(void *data, struct flb_config *config)
{
(void) *config;
Expand Down Expand Up @@ -431,6 +460,7 @@ struct flb_input_plugin in_forward_plugin = {
.cb_collect = in_fw_collect,
.cb_flush_buf = NULL,
.cb_pause = in_fw_pause,
.cb_resume = in_fw_resume,
.cb_exit = in_fw_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ struct flb_in_fw_config {

struct flb_log_event_decoder *log_decoder;
struct flb_log_event_encoder *log_encoder;

/* Plugin is paused */
int is_paused;
};

#endif

0 comments on commit a6feacd

Please sign in to comment.