Skip to content

Commit

Permalink
out_kafka: emit a metric when librdkafka signals an error
Browse files Browse the repository at this point in the history
The async nature of the kafka output makes the
fluentbit_output_dropped_records_total insufficient to determine whether there
are problems sending messages to kafka. fluent-bit considers a message
delivered when it has been added to the librdkafka buffers. If librdkafka
subsequently fails to deliver the message, the only feedback is a log message
such as:

```
[2024/11/12 07:56:45] [ warn] [output:kafka:azure] message delivery failed: Local: Message timed out
```

So let's add a metric that exposes how often librdkafka signals that it has
problems talking to kafka.

Signed-off-by: Dennis Kaarsemaker <[email protected]>
  • Loading branch information
seveas committed Nov 12, 2024
1 parent 58cc2d5 commit 2ef65d1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 0 deletions.
3 changes: 3 additions & 0 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
if (rkmessage->err) {
flb_plg_warn(ctx->ins, "message delivery failed: %s",
rd_kafka_err2str(rkmessage->err));
#ifdef FLB_HAVE_METRICS
cmt_counter_inc(ctx->cmt_kafka_errors, cfl_time_now(), 1, (char *[]){ctx->ins->alias});
#endif
}
else {
flb_plg_debug(ctx->ins, "message delivered (%zd bytes, "
Expand Down
6 changes: 6 additions & 0 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
}
}

/* Metrics */
#ifdef FLB_HAVE_METRICS
ctx->cmt_kafka_errors = cmt_counter_create(ctx->ins->cmt, "fluentbit", "output", "kafka_errors_total", "Number of kafka errors processing queued messages", 1, (char*[]) {"name"});
cmt_counter_set(ctx->cmt_kafka_errors, cfl_time_now(), 0, 1, (char *[]){ctx->ins->alias});
#endif

flb_plg_info(ctx->ins, "brokers='%s' topics='%s'", ctx->kafka.brokers, tmp);
#ifdef FLB_HAVE_AVRO_ENCODER
flb_plg_info(ctx->ins, "schemaID='%s' schema='%s'", ctx->avro_fields.schema_id, ctx->avro_fields.schema_str);
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct flb_out_kafka {
struct flb_avro_fields avro_fields;
#endif

struct cmt_counter *cmt_kafka_errors;
};

struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
Expand Down

0 comments on commit 2ef65d1

Please sign in to comment.