Skip to content

Commit

Permalink
Measure time spent on encoding and the compaction ratio (#871)
Browse files Browse the repository at this point in the history
Add metrics to measure time spent on encoding CBOR+ZSTD, and the
compaction ratio achieved by ZSTD.

Fixes #863
  • Loading branch information
masih authored Feb 4, 2025
1 parent 30fac2e commit e232882
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 11 deletions.
75 changes: 64 additions & 11 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package encoding

import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/klauspost/compress/zstd"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// maxDecompressedSize is the default maximum amount of memory allocated by the
Expand Down Expand Up @@ -37,15 +42,29 @@ func NewCBOR[T CBORMarshalUnmarshaler]() *CBOR[T] {
return &CBOR[T]{}
}

func (c *CBOR[T]) Encode(m T) ([]byte, error) {
func (c *CBOR[T]) Encode(m T) (_ []byte, _err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetCborEncode))
}
}(time.Now())
var out bytes.Buffer
if err := m.MarshalCBOR(&out); err != nil {
return nil, err
}
return out.Bytes(), nil
}

func (c *CBOR[T]) Decode(v []byte, t T) error {
func (c *CBOR[T]) Decode(v []byte, t T) (_err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetCborDecode))
}
}(time.Now())
r := bytes.NewReader(v)
return t.UnmarshalCBOR(r)
}
Expand All @@ -54,6 +73,9 @@ type ZSTD[T CBORMarshalUnmarshaler] struct {
cborEncoding *CBOR[T]
compressor *zstd.Encoder
decompressor *zstd.Decoder

metricAttr attribute.KeyValue
metricAttrLoader sync.Once
}

func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) {
Expand All @@ -74,26 +96,57 @@ func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) {
}, nil
}

func (c *ZSTD[T]) Encode(m T) ([]byte, error) {
cborEncoded, err := c.cborEncoding.Encode(m)
if len(cborEncoded) > maxDecompressedSize {
func (c *ZSTD[T]) Encode(t T) (_ []byte, _err error) {
defer func(start time.Time) {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetZstdEncode))
}(time.Now())
decompressed, err := c.cborEncoding.Encode(t)
if len(decompressed) > maxDecompressedSize {
// Error out early if the encoded value is too large to be decompressed.
return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(cborEncoded), maxDecompressedSize)
return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(decompressed), maxDecompressedSize)
}
if err != nil {
return nil, err
}
compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded)))
compressed := c.compressor.EncodeAll(decompressed, make([]byte, 0, len(decompressed)))
c.meterCompressionRatio(len(decompressed), len(compressed))
return compressed, nil
}

func (c *ZSTD[T]) Decode(v []byte, t T) error {
func (c *ZSTD[T]) Decode(compressed []byte, t T) (_err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetZstdDecode))
}
}(time.Now())
buf := bufferPool.Get().(*[]byte)
defer bufferPool.Put(buf)

cborEncoded, err := c.decompressor.DecodeAll(v, (*buf)[:0])
decompressed, err := c.decompressor.DecodeAll(compressed, (*buf)[:0])
if err != nil {
return err
}
return c.cborEncoding.Decode(cborEncoded, t)
c.meterCompressionRatio(len(decompressed), len(compressed))
return c.cborEncoding.Decode(decompressed, t)
}

func (c *ZSTD[T]) meterCompressionRatio(decompressedSize, compressedSize int) {
compressionRatio := float64(decompressedSize) / float64(compressedSize)
metrics.zstdCompressionRatio.Record(context.Background(), compressionRatio, metric.WithAttributes(c.getMetricAttribute()))
}

func (c *ZSTD[T]) getMetricAttribute() attribute.KeyValue {
c.metricAttrLoader.Do(func() {
const key = "type"
switch target := reflect.TypeFor[T](); {
case target.Kind() == reflect.Ptr:
c.metricAttr = attribute.String(key, target.Elem().Name())
default:
c.metricAttr = attribute.String(key, target.Name())
}
})
return c.metricAttr
}
7 changes: 7 additions & 0 deletions internal/encoding/encoding_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package encoding

import "go.opentelemetry.io/otel/attribute"

// GetMetricAttribute returns the attribute for metric collection, exported for
// testing purposes.
func (c *ZSTD[T]) GetMetricAttribute() attribute.KeyValue { return c.getMetricAttribute() }
18 changes: 18 additions & 0 deletions internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/require"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel/attribute"
)

var (
Expand Down Expand Up @@ -77,3 +78,20 @@ func TestZSTDLimits(t *testing.T) {
var dest testValue
require.ErrorContains(t, subject.Decode(tooLargeACompression, &dest), "decompressed size exceeds configured limit")
}

func TestZSTD_GetMetricAttribute(t *testing.T) {
t.Run("By Pointer", func(t *testing.T) {
subject, err := encoding.NewZSTD[*testValue]()
require.NoError(t, err)
require.Equal(t, attribute.String("type", "testValue"), subject.GetMetricAttribute())
})
t.Run("By Value", func(t *testing.T) {
type anotherTestValue struct {
cbg.CBORUnmarshaler
cbg.CBORMarshaler
}
subject, err := encoding.NewZSTD[anotherTestValue]()
require.NoError(t, err)
require.Equal(t, attribute.String("type", "anotherTestValue"), subject.GetMetricAttribute())
})
}
38 changes: 38 additions & 0 deletions internal/encoding/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package encoding

import (
"github.com/filecoin-project/go-f3/internal/measurements"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
attrCodecCbor = attribute.String("codec", "cbor")
attrCodecZstd = attribute.String("codec", "zstd")
attrActionEncode = attribute.String("action", "encode")
attrActionDecode = attribute.String("action", "decode")
attrSetCborEncode = attribute.NewSet(attrCodecCbor, attrActionEncode)
attrSetCborDecode = attribute.NewSet(attrCodecCbor, attrActionDecode)
attrSetZstdEncode = attribute.NewSet(attrCodecZstd, attrActionEncode)
attrSetZstdDecode = attribute.NewSet(attrCodecZstd, attrActionDecode)

meter = otel.Meter("f3/internal/encoding")

metrics = struct {
encodingTime metric.Float64Histogram
zstdCompressionRatio metric.Float64Histogram
}{
encodingTime: measurements.Must(meter.Float64Histogram(
"f3_internal_encoding_time",
metric.WithDescription("The time spent on encoding/decoding in seconds."),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.001, 0.003, 0.005, 0.01, 0.03, 0.05, 0.1, 0.3, 0.5, 1.0, 2.0, 5.0, 10.0),
)),
zstdCompressionRatio: measurements.Must(meter.Float64Histogram(
"f3_internal_encoding_zstd_compression_ratio",
metric.WithDescription("The ratio of compressed to uncompressed data size for zstd encoding."),
metric.WithExplicitBucketBoundaries(0.0, 0.1, 0.2, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0),
)),
}
)

0 comments on commit e232882

Please sign in to comment.