From 2ef65d19d76735e2d1723410bd50c3e2aaa348d9 Mon Sep 17 00:00:00 2001 From: Dennis Kaarsemaker Date: Tue, 12 Nov 2024 08:02:35 -0800 Subject: [PATCH] out_kafka: emit a metric when librdkafka signals an error The async nature of the kafka output makes the fluentbit_output_dropped_records_total insufficient to determine whether there are problems sending messages to kafka. fluent-bit considers a message delivered when it has been added to the librdkafka buffers. If librdkafka subsequently fails to deliver the message, the only feedback is a log message such as: ``` [2024/11/12 07:56:45] [ warn] [output:kafka:azure] message delivery failed: Local: Message timed out ``` So let's add a metric that exposes how often librdkafka signals that it has problems talking to kafka. Signed-off-by: Dennis Kaarsemaker --- plugins/out_kafka/kafka.c | 3 +++ plugins/out_kafka/kafka_config.c | 6 ++++++ plugins/out_kafka/kafka_config.h | 1 + 3 files changed, 10 insertions(+) diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 95fa5cd3e11..c28687d1b7c 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -34,6 +34,9 @@ void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, if (rkmessage->err) { flb_plg_warn(ctx->ins, "message delivery failed: %s", rd_kafka_err2str(rkmessage->err)); +#ifdef FLB_HAVE_METRICS + cmt_counter_inc(ctx->cmt_kafka_errors, cfl_time_now(), 1, (char *[]){ctx->ins->alias}); +#endif } else { flb_plg_debug(ctx->ins, "message delivered (%zd bytes, " diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index 0caea60d06d..fd02d871d0a 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -211,6 +211,12 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, } } + /* Metrics */ +#ifdef FLB_HAVE_METRICS + ctx->cmt_kafka_errors = cmt_counter_create(ctx->ins->cmt, "fluentbit", "output", "kafka_errors_total", "Number of kafka errors processing queued messages", 1, (char*[]) {"name"}); + cmt_counter_set(ctx->cmt_kafka_errors, cfl_time_now(), 0, 1, (char *[]){ctx->ins->alias}); +#endif + flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp); #ifdef FLB_HAVE_AVRO_ENCODER flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str); diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 14e036f8184..b14e5801bc5 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -124,6 +124,7 @@ struct flb_out_kafka { struct flb_avro_fields avro_fields; #endif + struct cmt_counter *cmt_kafka_errors; }; struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,