Skip to content

Commit

Permalink
Update otel proto buf specification
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas Longo committed Feb 13, 2025
1 parent 50f7a39 commit 862fbe2
Show file tree
Hide file tree
Showing 25 changed files with 97 additions and 829 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.Sum;
Expand Down Expand Up @@ -407,24 +405,9 @@ private ExportTraceServiceRequest createExportTraceRequest() {
.build())
.build();

final InstrumentationLibrarySpans ilSpans = InstrumentationLibrarySpans.newBuilder()
.setInstrumentationLibrary(InstrumentationLibrary.newBuilder()
.setName(ilName)
.setVersion(ilVersion)
.build())
.addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder()
.setTraceId(ByteString.copyFrom(TraceId2.getBytes()))
.setSpanId(ByteString.copyFrom(SpanId2.getBytes()))
.setKind(io.opentelemetry.proto.trace.v1.Span.SpanKind.SPAN_KIND_INTERNAL)
.setName(ilSpanName)
.setStartTimeUnixNano(currentUnixTimeNano)
.setEndTimeUnixNano(currentUnixTimeNano+TIME_DELTA*1000_000_000)
.build())
.build();
ResourceSpans resourceSpans = ResourceSpans.newBuilder()
.setResource(resource)
.addScopeSpans(scopeSpans)
.addInstrumentationLibrarySpans(ilSpans)
.build();

return ExportTraceServiceRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import io.micrometer.core.instrument.Timer;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.logs.v1.InstrumentationLibraryLogs;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -61,9 +62,8 @@
public class OTelLogsGrpcServiceTest {
private static final ExportLogsServiceRequest LOGS_REQUEST = ExportLogsServiceRequest.newBuilder()
.addResourceLogs(ResourceLogs.newBuilder()
.addInstrumentationLibraryLogs(InstrumentationLibraryLogs.newBuilder()
.addLogRecords(LogRecord.newBuilder())
.build())).build();
.addScopeLogs(ScopeLogs.newBuilder().addLogRecords(LogRecord.newBuilder()) .build()))
.build();

private static PluginSetting pluginSetting;
private final int bufferWriteTimeoutInMillis = 100000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,21 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;
import org.apache.commons.codec.binary.Hex;
import org.opensearch.dataprepper.model.metric.Bucket;
import org.opensearch.dataprepper.model.metric.DefaultBucket;
import org.opensearch.dataprepper.model.metric.DefaultExemplar;
import org.opensearch.dataprepper.model.metric.DefaultQuantile;
import org.opensearch.dataprepper.model.metric.Exemplar;
import org.opensearch.dataprepper.model.metric.Quantile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -40,14 +31,7 @@ public final class OTelMetricsProtoHelper {

private static final Logger LOG = LoggerFactory.getLogger(OTelMetricsProtoHelper.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String SERVICE_NAME = "service.name";
private static final String METRIC_ATTRIBUTES = "metric.attributes";
static final String RESOURCE_ATTRIBUTES = "resource.attributes";
static final String EXEMPLAR_ATTRIBUTES = "exemplar.attributes";
static final String INSTRUMENTATION_LIBRARY_NAME = "instrumentationLibrary.name";
static final String INSTRUMENTATION_LIBRARY_VERSION = "instrumentationLibrary.version";
static final String INSTRUMENTATION_SCOPE_NAME = "instrumentationScope.name";
static final String INSTRUMENTATION_SCOPE_VERSION = "instrumentationScope.version";

/**
* To make it ES friendly we will replace '.' in keys with '@' in all the Keys in {@link io.opentelemetry.proto.common.v1.KeyValue}
Expand All @@ -60,8 +44,6 @@ public final class OTelMetricsProtoHelper {
* Span and Resource attributes are essential for kibana so they should not be nested. SO we will prefix them with "metric.attributes"
* and "resource.attributes" and "exemplar.attributes".
*/
public static final Function<String, String> PREFIX_AND_METRIC_ATTRIBUTES_REPLACE_DOT_WITH_AT = i -> METRIC_ATTRIBUTES + DOT + i.replace(DOT, AT);
public static final Function<String, String> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT = i -> RESOURCE_ATTRIBUTES + DOT + i.replace(DOT, AT);
public static final Function<String, String> PREFIX_AND_EXEMPLAR_ATTRIBUTES_REPLACE_DOT_WITH_AT = i -> EXEMPLAR_ATTRIBUTES + DOT + i.replace(DOT, AT);

private OTelMetricsProtoHelper() {
Expand Down Expand Up @@ -111,30 +93,6 @@ public static Object convertAnyValue(final AnyValue value) {
}
}

/**
* Converts the keys of all attributes in the {@link NumberDataPoint}.
* Also, casts the underlying data into its actual type
*
* @param numberDataPoint The point to process
* @return A Map containing all attributes of `numberDataPoint` with keys converted into an OS-friendly format
*/
public static Map<String, Object> convertKeysOfDataPointAttributes(final NumberDataPoint numberDataPoint) {
return numberDataPoint.getAttributesList().stream()
.collect(Collectors.toMap(i -> PREFIX_AND_METRIC_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Unpacks the List of {@link KeyValue} object into a Map.
* Converts the keys into an os friendly format and casts the underlying data into its actual type?
*
* @param attributesList The list of {@link KeyValue} objects to process
* @return A Map containing unpacked {@link KeyValue} data
*/
public static Map<String, Object> unpackKeyValueList(List<KeyValue> attributesList) {
return attributesList.stream()
.collect(Collectors.toMap(i -> PREFIX_AND_METRIC_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Unpacks the List of {@link KeyValue} object into a Map.
* Converts the keys into an os friendly format and casts the underlying data into its actual type?
Expand Down Expand Up @@ -184,79 +142,11 @@ public static Double getExemplarValueAsDouble(final io.opentelemetry.proto.metri
}
}

public static Map<String, Object> getResourceAttributes(final Resource resource) {
return resource.getAttributesList().stream()
.collect(Collectors.toMap(i -> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Extracts the name and version of the used instrumentation library used
*
* @param instrumentationLibrary instrumentationLibrary
* @return A map, containing information about the instrumentation library
*/
public static Map<String, Object> getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) {
final Map<String, Object> instrumentationAttr = new HashMap<>();
if (!instrumentationLibrary.getName().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_NAME, instrumentationLibrary.getName());
}
if (!instrumentationLibrary.getVersion().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_VERSION, instrumentationLibrary.getVersion());
}
return instrumentationAttr;
}

/**
* Extracts the name and version of the used instrumentation scope used
*
* @param instrumentationScope instrumentationScope
* @return A map, containing information about the instrumentation scope
*/
public static Map<String, Object> getInstrumentationScopeAttributes(final InstrumentationScope instrumentationScope) {
final Map<String, Object> instrumentationScopeAttr = new HashMap<>();
if (!instrumentationScope.getName().isEmpty()) {
instrumentationScopeAttr.put(INSTRUMENTATION_SCOPE_NAME, instrumentationScope.getName());
}
if (!instrumentationScope.getVersion().isEmpty()) {
instrumentationScopeAttr.put(INSTRUMENTATION_SCOPE_VERSION, instrumentationScope.getVersion());
}
return instrumentationScopeAttr;
}


public static String convertUnixNanosToISO8601(final long unixNano) {
return Instant.ofEpochSecond(0L, unixNano).toString();
}

public static String getStartTimeISO8601(final NumberDataPoint numberDataPoint) {
return convertUnixNanosToISO8601(numberDataPoint.getStartTimeUnixNano());
}

public static String getTimeISO8601(final NumberDataPoint ndp) {
return convertUnixNanosToISO8601(ndp.getTimeUnixNano());
}

public static Optional<String> getServiceName(final Resource resource) {
return resource.getAttributesList().stream()
.filter(keyValue -> keyValue.getKey().equals(SERVICE_NAME) && !keyValue.getValue().getStringValue().isEmpty())
.findFirst()
.map(i -> i.getValue().getStringValue());
}


public static Map<String, Object> mergeAllAttributes(final Collection<Map<String, Object>> attributes) {
return attributes.stream()
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}


public static List<Quantile> getQuantileValues(List<SummaryDataPoint.ValueAtQuantile> quantileValues) {
return quantileValues.stream()
.map(q -> new DefaultQuantile(q.getQuantile(), q.getValue()))
.collect(Collectors.toList());
}

/**
* Create the buckets, see <a href="https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/metrics/v1/metrics.proto">
* the OTel metrics proto spec</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogram;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -139,7 +139,7 @@ private ExportMetricsServiceRequest fillServiceRequest(ExponentialHistogram hist
.setName("name")
.setDescription("description")
.build();
InstrumentationLibraryMetrics instLib = InstrumentationLibraryMetrics.newBuilder()
ScopeMetrics scopeMetrics = ScopeMetrics.newBuilder()
.addMetrics(metric).build();

Resource resource = Resource.newBuilder()
Expand All @@ -149,7 +149,7 @@ private ExportMetricsServiceRequest fillServiceRequest(ExponentialHistogram hist
).build();
ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
.setResource(resource)
.addInstrumentationLibraryMetrics(instLib)
.addScopeMetrics(scopeMetrics)
.build();
return ExportMetricsServiceRequest.newBuilder().addResourceMetrics(resourceMetrics).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
import com.google.protobuf.ByteString;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Exemplar;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
Expand Down Expand Up @@ -71,60 +69,6 @@ void init() {
rawProcessor = new OTelMetricsRawProcessor(testsettings, new OtelMetricsRawProcessorConfig());
}

@Test
void testInstrumentationLibrary() throws JsonProcessingException {
NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4);
Gauge gauge = Gauge.newBuilder().addDataPoints(p1).build();

io.opentelemetry.proto.metrics.v1.Metric.Builder metric = io.opentelemetry.proto.metrics.v1.Metric.newBuilder()
.setGauge(gauge)
.setUnit("seconds")
.setName("name")
.setDescription("description");

InstrumentationLibraryMetrics isntLib = InstrumentationLibraryMetrics.newBuilder()
.addMetrics(metric)
.setInstrumentationLibrary(InstrumentationLibrary.newBuilder()
.setName("ilname")
.setVersion("ilversion")
.build())
.build();

Resource resource = Resource.newBuilder()
.addAttributes(KeyValue.newBuilder()
.setKey("service.name")
.setValue(AnyValue.newBuilder().setStringValue("service").build())
).build();

ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
.addInstrumentationLibraryMetrics(isntLib)
.setResource(resource)
.build();

ExportMetricsServiceRequest exportMetricRequest = ExportMetricsServiceRequest.newBuilder()
.addResourceMetrics(resourceMetrics).build();

Record<ExportMetricsServiceRequest> record = new Record<>(exportMetricRequest);

Collection<Record<? extends Metric>> records = rawProcessor.doExecute(Collections.singletonList(record));
List<Record<? extends Metric>> list = new ArrayList<>(records);

Record<? extends Metric> dataPrepperResult = list.get(0);
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = objectMapper.readValue(dataPrepperResult.getData().toJsonString(), Map.class);
assertThat(map).contains(entry("kind", Metric.KIND.GAUGE.toString()));
assertThat(map).contains(entry("unit", "seconds"));
assertThat(map).contains(entry("serviceName", "service"));
assertThat(map).contains(entry("resource.attributes.service@name", "service"));
assertThat(map).contains(entry("description", "description"));
assertThat(map).contains(entry("value", 4.0D));
assertThat(map).contains(entry("startTime", "1970-01-01T00:00:00Z"));
assertThat(map).contains(entry("time", "1970-01-01T00:00:00Z"));
assertThat(map).contains(entry("instrumentationLibrary.name", "ilname"));
assertThat(map).contains(entry("instrumentationLibrary.version", "ilversion"));

}

@Test
void testScopeMetricsLibrary() throws JsonProcessingException {
NumberDataPoint.Builder p1 = NumberDataPoint.newBuilder().setAsInt(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Histogram;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -119,7 +119,7 @@ private ExportMetricsServiceRequest fillServiceRequest(Histogram histogram) {
.setName("name")
.setDescription("description")
.build();
InstrumentationLibraryMetrics instLib = InstrumentationLibraryMetrics.newBuilder()
ScopeMetrics scopeMetrics = ScopeMetrics.newBuilder()
.addMetrics(metric).build();

Resource resource = Resource.newBuilder()
Expand All @@ -129,7 +129,7 @@ private ExportMetricsServiceRequest fillServiceRequest(Histogram histogram) {
).build();
ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
.setResource(resource)
.addInstrumentationLibraryMetrics(instLib)
.addScopeMetrics(scopeMetrics)
.build();
return ExportMetricsServiceRequest.newBuilder().addResourceMetrics(resourceMetrics).build();
}
Expand Down
Loading

0 comments on commit 862fbe2

Please sign in to comment.