Skip to content

Commit

Permalink
in_kafka: Make poll interval configurable and use config map
Browse files Browse the repository at this point in the history
Signed-off-by: Thiago Padilha <[email protected]>
  • Loading branch information
tchrono authored and edsiper committed Apr 18, 2023
1 parent 28eb678 commit 341ddbc
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
1 change: 1 addition & 0 deletions examples/kafka_filter/kafka.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Name kafka
brokers kafka-broker:9092
topics fb-source
poll_ms 100

[FILTER]
Name lua
Expand Down
53 changes: 51 additions & 2 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ static int in_kafka_init(struct flb_input_instance *ins,
if (!ctx) {
return -1;
}
ctx->ins = ins;

ret = flb_input_config_map_set(ins, (void*) ctx);
if (ret == -1) {
flb_plg_error(ins, "unable to load configuration.");
flb_free(ctx);
return -1;
}

kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
if (!kafka_conf) {
flb_plg_error(ins, "Could not initialize kafka config object");
Expand Down Expand Up @@ -213,9 +222,12 @@ static int in_kafka_init(struct flb_input_instance *ins,
flb_input_set_context(ins, ctx);
/* Collect upon data available on the pipe read fd */

int poll_seconds = ctx->poll_ms / 1000;
int poll_milliseconds = ctx->poll_ms % 1000;

ret = flb_input_set_collector_time(ins,
in_kafka_collect,
0, 5e8,
poll_seconds, poll_milliseconds * 1e6,
config);
if (ret) {
flb_plg_error(ctx->ins, "could not set collector for kafka input plugin");
Expand Down Expand Up @@ -257,6 +269,42 @@ static int in_kafka_exit(void *in_context, struct flb_config *config)
return 0;
}

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_INT, "poll_ms", FLB_IN_KAFKA_DEFAULT_POLL_MS,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, poll_ms),
"Interval in milliseconds to check for new messages."
},
{
FLB_CONFIG_MAP_STR, "topics", (char *)NULL,
0, FLB_FALSE, 0,
"Set the kafka topics, delimited by commas."
},
{
FLB_CONFIG_MAP_STR, "brokers", (char *)NULL,
0, FLB_FALSE, 0,
"Set the kafka brokers, delimited by commas."
},
{
FLB_CONFIG_MAP_STR, "client_id", (char *)NULL,
0, FLB_FALSE, 0,
"Set the kafka client_id."
},
{
FLB_CONFIG_MAP_STR, "group_id", (char *)NULL,
0, FLB_FALSE, 0,
"Set the kafka group_id."
},
{
FLB_CONFIG_MAP_STR_PREFIX, "rdkafka.", NULL,
//FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct flb_in_kafka_config, rdkafka_opts),
0, FLB_FALSE, 0,
"Set the librdkafka options"
},
/* EOF */
{0}
};

/* Plugin reference */
struct flb_input_plugin in_kafka_plugin = {
.name = "kafka",
Expand All @@ -265,5 +313,6 @@ struct flb_input_plugin in_kafka_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_kafka_collect,
.cb_flush_buf = NULL,
.cb_exit = in_kafka_exit
.cb_exit = in_kafka_exit,
.config_map = config_map
};
4 changes: 4 additions & 0 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
#include <fluent-bit/flb_kafka.h>
#include <fluent-bit/flb_log_event_encoder.h>


#define FLB_IN_KAFKA_DEFAULT_POLL_MS "500"

struct flb_in_kafka_config {
struct flb_kafka kafka;
struct flb_input_instance *ins;
struct flb_log_event_encoder *log_encoder;
int poll_ms;
};

#endif

0 comments on commit 341ddbc

Please sign in to comment.