Skip to content

Commit

Permalink
opentelemetry: initial addition of the experimental otlp profile signal
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich committed Nov 12, 2024
1 parent 58cc2d5 commit 0fe73c5
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 0 deletions.
8 changes: 8 additions & 0 deletions plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,14 @@ static struct flb_config_map config_map[] = {
NULL
},

{
FLB_CONFIG_MAP_BOOL, "profiles_support", "false",
0, FLB_TRUE, offsetof(struct flb_opentelemetry, profile_support_enabled),
"This is an experimental feature whoses specification is not stable yet, " \
"feel free to test it but please do not enable this in production " \
"environments"
},

{
FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE,
0, FLB_TRUE, offsetof(struct flb_opentelemetry, buffer_max_size),
Expand Down
1 change: 1 addition & 0 deletions plugins/in_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct flb_opentelemetry {
int raw_traces;
int tag_from_uri;
flb_sds_t logs_metadata_key;
int profile_support_enabled;

struct flb_input_instance *ins;

Expand Down
157 changes: 157 additions & 0 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include <monkey/mk_core.h>
#include <cmetrics/cmt_decode_opentelemetry.h>
#include <cprofiles/cprof_decode_opentelemetry.h>
#include <cprofiles/cprof_encode_text.h>

#include <fluent-otel-proto/fluent-otel.h>
#include "opentelemetry.h"
Expand Down Expand Up @@ -2431,6 +2433,146 @@ static int process_payload_logs_ng(struct flb_opentelemetry *ctx,
return ret;
}

static int process_payload_profiles_ng(struct flb_opentelemetry *ctx,
flb_sds_t tag,
struct flb_http_request *request,
struct flb_http_response *response)
{
cfl_sds_t text_encoded_profiles_context;
struct cprof *profiles_context;
struct flb_log_event_encoder *encoder;
size_t offset;
int ret;

encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2);

if (encoder == NULL) {
return -1;
}

if (request->content_type == NULL) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] content type missing");

return -1;
}
else if (strcasecmp(request->content_type, "application/json") == 0) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] unsuported profiles encoding type : %s",
request->content_type);

return -1;
}
else if (strcasecmp(request->content_type, "application/x-protobuf") == 0) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] unsuported profiles encoding type : %s",
request->content_type);

return -1;
}
else if (strcasecmp(request->content_type, "application/grpc") == 0) {
if (cfl_sds_len(request->body) < 5) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] malformed grpc packet of size %zu",
cfl_sds_len(request->body));

return -1;
}

profiles_context = NULL;
offset = 0;

ret = cprof_decode_opentelemetry_create(&profiles_context,
&((uint8_t *) request->body)[5],
(cfl_sds_len(request->body)) - 5,
&offset);

if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] profile decoding error : %d",
ret);

return -1;
}

ret = cprof_encode_text_create(&text_encoded_profiles_context, profiles_context);

cprof_decode_opentelemetry_destroy(profiles_context);

if (ret != CPROF_DECODE_OPENTELEMETRY_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] profile text encoding error : %d",
ret);

return -1;
}

flb_log_event_encoder_begin_record(encoder);

flb_log_event_encoder_set_current_timestamp(encoder);

ret = flb_log_event_encoder_append_body_values(
encoder,
FLB_LOG_EVENT_CSTRING_VALUE("Profile"),
FLB_LOG_EVENT_STRING_VALUE(text_encoded_profiles_context,
cfl_sds_len(text_encoded_profiles_context)));

cprof_encode_text_destroy(text_encoded_profiles_context);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = flb_log_event_encoder_commit_record(encoder);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = flb_input_log_append(ctx->ins,
tag,
flb_sds_len(tag),
encoder->output_buffer,
encoder->output_length);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);

flb_error("[otel] re encoded profile ingestion error : %d",
ret);

return -1;
}

ret = 0;
}
else {
flb_plg_error(ctx->ins, "Unsupported content type %s", request->content_type);

ret = -1;
}


flb_log_event_encoder_destroy(encoder);

return ret;
}

static int send_export_service_response_ng(struct flb_http_response *response,
int result,
Expand Down Expand Up @@ -2478,6 +2620,10 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request,

grpc_request = FLB_TRUE;
}
else if (context->profile_support_enabled &&
strcmp(request->path, "/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export") == 0) {
grpc_request = FLB_TRUE;
}
else {
send_response_ng(response, 400, "error: invalid endpoint\n");
return -1;
Expand Down Expand Up @@ -2531,6 +2677,17 @@ int opentelemetry_prot_handle_ng(struct flb_http_request *request,
}
result = process_payload_logs_ng(context, tag, request, response);
}
else if (context->profile_support_enabled &&
strcmp(request->path, "/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export") == 0) {
payload_type = 'P';
if (context->tag_from_uri == FLB_TRUE) {
tag = flb_sds_create("v1development_profiles");
}
else {
tag = flb_sds_create(context->ins->tag);
}
result = process_payload_profiles_ng(context, tag, request, response);
}

if (grpc_request) {
send_export_service_response_ng(response, result, payload_type);
Expand Down

0 comments on commit 0fe73c5

Please sign in to comment.