From 28eb678a231abb9167b3e52e32837514625d024e Mon Sep 17 00:00:00 2001 From: Thiago Padilha Date: Mon, 6 Mar 2023 16:08:44 -0300 Subject: [PATCH] in_kafka: refactor implementation to use flb_input_set_collector_time This is a move from `flb_input_thread` API, the new implementation is simpler and just uses `flb_input_set_collector_time` to poll librdkafka for messages. Signed-off-by: Thiago Padilha --- plugins/in_kafka/in_kafka.c | 106 ++++++++++++++++++++++-------------- plugins/in_kafka/in_kafka.h | 1 - 2 files changed, 64 insertions(+), 43 deletions(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 0c4218ee589..50efcc91797 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -26,13 +26,13 @@ #include #include #include -#include #include #include #include #include "fluent-bit/flb_input.h" #include "fluent-bit/flb_kafka.h" +#include "fluent-bit/flb_mem.h" #include "in_kafka.h" #include "rdkafka.h" @@ -56,7 +56,7 @@ static int try_json(mpack_writer_t *writer, rd_kafka_message_t *rkm) } static void process_message(mpack_writer_t *writer, - rd_kafka_message_t *rkm) + rd_kafka_message_t *rkm) { struct flb_time t; @@ -70,7 +70,8 @@ static void process_message(mpack_writer_t *writer, mpack_write_cstr(writer, "topic"); if (rkm->rkt) { mpack_write_cstr(writer, rd_kafka_topic_name(rkm->rkt)); - } else { + } + else { mpack_write_nil(writer); } @@ -83,14 +84,16 @@ static void process_message(mpack_writer_t *writer, mpack_write_cstr(writer, "error"); if (rkm->err) { mpack_write_cstr(writer, rd_kafka_message_errstr(rkm)); - } else { + } + else { mpack_write_nil(writer); } mpack_write_cstr(writer, "key"); if (rkm->key) { mpack_write_str(writer, rkm->key, rkm->key_len); - } else { + } + else { mpack_write_nil(writer); } @@ -99,29 +102,59 @@ static void process_message(mpack_writer_t *writer, if (try_json(writer, rkm)) { mpack_write_str(writer, rkm->payload, rkm->len); } - } else { + } + else { mpack_write_nil(writer); } mpack_writer_flush_message(writer); } -static void in_kafka_callback(int write_fd, void *data) +static int in_kafka_collect(struct flb_input_instance *ins, + struct flb_config *config, void *in_context) { - struct flb_input_thread *it = data; - struct flb_in_kafka_config *ctx = data - offsetof(struct flb_in_kafka_config, it); - mpack_writer_t *writer = &ctx->it.writer; - - while (!flb_input_thread_exited(it)) { - rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 500); - - if (rkm) { - process_message(writer, rkm); - fflush(ctx->it.write_file); - rd_kafka_message_destroy(rkm); - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + mpack_writer_t writer; + char *buf; + size_t bufsize; + size_t written = 0; + struct flb_in_kafka_config *ctx = in_context; + + mpack_writer_init_growable(&writer, &buf, &bufsize); + + if (writer.error == mpack_error_memory) { + flb_plg_error(ins, "Failed to allocate buffer."); + return -1; + } + + while (true) { + rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); + + if (!rkm) { + break; + } + + flb_plg_debug(ins, "kafka message received"); + process_message(&writer, rkm); + rd_kafka_message_destroy(rkm); + rd_kafka_commit(ctx->kafka.rk, NULL, 0); + + if (writer.error == mpack_error_memory) { + flb_plg_error(ins, "Failed to allocate buffer."); + return -1; } } + + written = writer.position - writer.buffer; + + if (written == 0) { + mpack_writer_destroy(&writer); + return -1; + } + + flb_input_log_append(ins, NULL, 0, writer.buffer, written); + mpack_writer_destroy(&writer); + + return 0; } /* Initialize plugin */ @@ -176,27 +209,18 @@ static int in_kafka_init(struct flb_input_instance *ins, rd_kafka_topic_partition_list_destroy(kafka_topics); kafka_topics = NULL; - /* create worker thread */ - ret = flb_input_thread_init(&ctx->it, in_kafka_callback, &ctx->it); - if (ret) { - flb_errno(); - flb_plg_error(ins, "Could not initialize worker thread"); - goto init_error; - } - /* Set the context */ - flb_input_set_context(ins, &ctx->it); - + flb_input_set_context(ins, ctx); /* Collect upon data available on the pipe read fd */ - ret = flb_input_set_collector_event(ins, - flb_input_thread_collect, - ctx->it.read, - config); - if (ret == -1) { - flb_plg_error(ins, "Could not set collector for thread dummy input plugin"); + + ret = flb_input_set_collector_time(ins, + in_kafka_collect, + 0, 5e8, + config); + if (ret) { + flb_plg_error(ctx->ins, "could not set collector for kafka input plugin"); goto init_error; } - ctx->it.coll_fd = ret; return 0; @@ -206,7 +230,8 @@ static int in_kafka_init(struct flb_input_instance *ins, } if (ctx->kafka.rk) { rd_kafka_destroy(ctx->kafka.rk); - } else if (kafka_conf) { + } + else if (kafka_conf) { // conf is already destroyed when rd_kafka is initialized rd_kafka_conf_destroy(kafka_conf); } @@ -218,16 +243,13 @@ static int in_kafka_init(struct flb_input_instance *ins, /* Cleanup serial input */ static int in_kafka_exit(void *in_context, struct flb_config *config) { - struct flb_input_thread *it; struct flb_in_kafka_config *ctx; if (!in_context) { return 0; } - it = in_context; - ctx = (in_context - offsetof(struct flb_in_kafka_config, it)); - flb_input_thread_destroy(it, ctx->ins); + ctx = in_context; rd_kafka_destroy(ctx->kafka.rk); flb_free(ctx->kafka.brokers); flb_free(ctx); @@ -241,7 +263,7 @@ struct flb_input_plugin in_kafka_plugin = { .description = "Kafka consumer input plugin", .cb_init = in_kafka_init, .cb_pre_run = NULL, - .cb_collect = flb_input_thread_collect, + .cb_collect = in_kafka_collect, .cb_flush_buf = NULL, .cb_exit = in_kafka_exit }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 6ee08875163..11b6e6964b4 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -30,7 +30,6 @@ struct flb_in_kafka_config { struct flb_kafka kafka; struct flb_input_instance *ins; - struct flb_input_thread it; struct flb_log_event_encoder *log_encoder; };