Skip to content

Commit

Permalink
in_kafka: refactor implementation to use flb_input_set_collector_time
Browse files Browse the repository at this point in the history
This is a move from `flb_input_thread` API, the new implementation is
simpler and just uses `flb_input_set_collector_time` to poll librdkafka
for messages.

Signed-off-by: Thiago Padilha <[email protected]>
  • Loading branch information
tchrono authored and edsiper committed Apr 18, 2023
1 parent b57b8a9 commit 28eb678
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 43 deletions.
106 changes: 64 additions & 42 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_error.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_input_thread.h>
#include <mpack/mpack.h>
#include <stddef.h>
#include <stdio.h>

#include "fluent-bit/flb_input.h"
#include "fluent-bit/flb_kafka.h"
#include "fluent-bit/flb_mem.h"
#include "in_kafka.h"
#include "rdkafka.h"

Expand All @@ -56,7 +56,7 @@ static int try_json(mpack_writer_t *writer, rd_kafka_message_t *rkm)
}

static void process_message(mpack_writer_t *writer,
rd_kafka_message_t *rkm)
rd_kafka_message_t *rkm)
{
struct flb_time t;

Expand All @@ -70,7 +70,8 @@ static void process_message(mpack_writer_t *writer,
mpack_write_cstr(writer, "topic");
if (rkm->rkt) {
mpack_write_cstr(writer, rd_kafka_topic_name(rkm->rkt));
} else {
}
else {
mpack_write_nil(writer);
}

Expand All @@ -83,14 +84,16 @@ static void process_message(mpack_writer_t *writer,
mpack_write_cstr(writer, "error");
if (rkm->err) {
mpack_write_cstr(writer, rd_kafka_message_errstr(rkm));
} else {
}
else {
mpack_write_nil(writer);
}

mpack_write_cstr(writer, "key");
if (rkm->key) {
mpack_write_str(writer, rkm->key, rkm->key_len);
} else {
}
else {
mpack_write_nil(writer);
}

Expand All @@ -99,29 +102,59 @@ static void process_message(mpack_writer_t *writer,
if (try_json(writer, rkm)) {
mpack_write_str(writer, rkm->payload, rkm->len);
}
} else {
}
else {
mpack_write_nil(writer);
}

mpack_writer_flush_message(writer);
}

static void in_kafka_callback(int write_fd, void *data)
static int in_kafka_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
struct flb_input_thread *it = data;
struct flb_in_kafka_config *ctx = data - offsetof(struct flb_in_kafka_config, it);
mpack_writer_t *writer = &ctx->it.writer;

while (!flb_input_thread_exited(it)) {
rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 500);

if (rkm) {
process_message(writer, rkm);
fflush(ctx->it.write_file);
rd_kafka_message_destroy(rkm);
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
mpack_writer_t writer;
char *buf;
size_t bufsize;
size_t written = 0;
struct flb_in_kafka_config *ctx = in_context;

mpack_writer_init_growable(&writer, &buf, &bufsize);

if (writer.error == mpack_error_memory) {
flb_plg_error(ins, "Failed to allocate buffer.");
return -1;
}

while (true) {
rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);

if (!rkm) {
break;
}

flb_plg_debug(ins, "kafka message received");
process_message(&writer, rkm);
rd_kafka_message_destroy(rkm);
rd_kafka_commit(ctx->kafka.rk, NULL, 0);

if (writer.error == mpack_error_memory) {
flb_plg_error(ins, "Failed to allocate buffer.");
return -1;
}
}

written = writer.position - writer.buffer;

if (written == 0) {
mpack_writer_destroy(&writer);
return -1;
}

flb_input_log_append(ins, NULL, 0, writer.buffer, written);
mpack_writer_destroy(&writer);

return 0;
}

/* Initialize plugin */
Expand Down Expand Up @@ -176,27 +209,18 @@ static int in_kafka_init(struct flb_input_instance *ins,
rd_kafka_topic_partition_list_destroy(kafka_topics);
kafka_topics = NULL;

/* create worker thread */
ret = flb_input_thread_init(&ctx->it, in_kafka_callback, &ctx->it);
if (ret) {
flb_errno();
flb_plg_error(ins, "Could not initialize worker thread");
goto init_error;
}

/* Set the context */
flb_input_set_context(ins, &ctx->it);

flb_input_set_context(ins, ctx);
/* Collect upon data available on the pipe read fd */
ret = flb_input_set_collector_event(ins,
flb_input_thread_collect,
ctx->it.read,
config);
if (ret == -1) {
flb_plg_error(ins, "Could not set collector for thread dummy input plugin");

ret = flb_input_set_collector_time(ins,
in_kafka_collect,
0, 5e8,
config);
if (ret) {
flb_plg_error(ctx->ins, "could not set collector for kafka input plugin");
goto init_error;
}
ctx->it.coll_fd = ret;

return 0;

Expand All @@ -206,7 +230,8 @@ static int in_kafka_init(struct flb_input_instance *ins,
}
if (ctx->kafka.rk) {
rd_kafka_destroy(ctx->kafka.rk);
} else if (kafka_conf) {
}
else if (kafka_conf) {
// conf is already destroyed when rd_kafka is initialized
rd_kafka_conf_destroy(kafka_conf);
}
Expand All @@ -218,16 +243,13 @@ static int in_kafka_init(struct flb_input_instance *ins,
/* Cleanup serial input */
static int in_kafka_exit(void *in_context, struct flb_config *config)
{
struct flb_input_thread *it;
struct flb_in_kafka_config *ctx;

if (!in_context) {
return 0;
}

it = in_context;
ctx = (in_context - offsetof(struct flb_in_kafka_config, it));
flb_input_thread_destroy(it, ctx->ins);
ctx = in_context;
rd_kafka_destroy(ctx->kafka.rk);
flb_free(ctx->kafka.brokers);
flb_free(ctx);
Expand All @@ -241,7 +263,7 @@ struct flb_input_plugin in_kafka_plugin = {
.description = "Kafka consumer input plugin",
.cb_init = in_kafka_init,
.cb_pre_run = NULL,
.cb_collect = flb_input_thread_collect,
.cb_collect = in_kafka_collect,
.cb_flush_buf = NULL,
.cb_exit = in_kafka_exit
};
1 change: 0 additions & 1 deletion plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
struct flb_in_kafka_config {
struct flb_kafka kafka;
struct flb_input_instance *ins;
struct flb_input_thread it;
struct flb_log_event_encoder *log_encoder;
};

Expand Down

0 comments on commit 28eb678

Please sign in to comment.