Skip to content

Commit

Permalink
Merge pull request #142 from warpstreamlabs/ra/fix-parquet-stuff
Browse files Browse the repository at this point in the history
Improve parquet handling of optional decimals and float <> float conversions
  • Loading branch information
richardartoul authored Oct 28, 2024
2 parents 1bb15f8 + e8beaf0 commit c625815
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 62 deletions.
43 changes: 17 additions & 26 deletions internal/impl/parquet/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package parquet
import (
"errors"
"fmt"
"math"
"reflect"
)

Expand Down Expand Up @@ -84,10 +85,11 @@ func setField(field reflect.Value, value any) error {
}
return fmt.Errorf("cannot represent %v as int32", value)
case float64:
if float64(int32(v)) == v {
if v >= math.MinInt32 || v <= math.MaxInt32 {
field.SetInt(int64(v))
return nil
}

return fmt.Errorf("cannot represent %v as int32", value)
default:
return fmt.Errorf("cannot convert %T to int64", value)
Expand All @@ -100,6 +102,10 @@ func setField(field reflect.Value, value any) error {
case int64:
field.SetInt(v)
case float64:
if v >= math.MinInt64 || v <= math.MaxInt64 {
field.SetInt(int64(v))
return nil
}
field.SetInt(int64(v))
default:
return fmt.Errorf("cannot convert %T to int64", value)
Expand All @@ -112,40 +118,25 @@ func setField(field reflect.Value, value any) error {
case float64:
field.SetFloat(v)
case int:
if int(float64(v)) == v {
field.SetFloat(float64(v))
return nil
}
return fmt.Errorf("cannot represent %v as float64", value)
field.SetFloat(float64(v))
return nil
case int64:
if int64(float64(v)) == v {
field.SetFloat(float64(v))
return nil
}
return fmt.Errorf("cannot represent %v as float64", value)
field.SetFloat(float64(v))
return nil
default:
return fmt.Errorf("cannot convert %T to float64", value)
}
case reflect.Float32:
switch v := value.(type) {
case float64:
if float64(float32(v)) == v {
field.SetFloat(v)
return nil
}
return fmt.Errorf("cannot represent %v as float32", value)
field.SetFloat(v)
return nil
case int:
if int(float32(v)) == v {
field.SetFloat(float64(v))
return nil
}
return fmt.Errorf("cannot represent %v as float32", value)
field.SetFloat(float64(v))
return nil
case int64:
if int64(float32(v)) == v {
field.SetFloat(float64(v))
return nil
}
return fmt.Errorf("cannot represent %v as float32", value)
field.SetFloat(float64(v))
return nil
default:
return fmt.Errorf("cannot convert %T to float64", value)
}
Expand Down
27 changes: 24 additions & 3 deletions internal/impl/parquet/processor_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,36 @@ func newParquetEncodeProcessorFromConfig(
return nil, fmt.Errorf("default_compression type %v not recognised", compressStr)
}

schemaType, err := GenerateStructType(conf)
// For the schema, we don't want any of the actual values encoded as pointers. That works
// for 99% of things, but it doesn't work for decimal types, so we use the optional struct
// tags approach which works for 100% of things. This is fine because even thought the
// optional struct tags approach cant represent null values, is this just defining the schema
// so we don't care if the struct value is a concrete type or a pointer to a concrete type.
schemaType, err := GenerateStructType(conf, schemaOpts{
optionalsAsStructTags: true,
optionalAsPtrs: false,
})
if err != nil {
return nil, fmt.Errorf(
"failed to generate struct type from parquet schema: %w", err)
"failed to generate struct type from parquet schema(schema): %w", err)
}

// For the actual *struct values* that we're going to pass to the parquet encoder, we use
// the pointer approach. This is fine because this struct won't be passed to parquet.SchemaOf()
// so it won't trigger the panic in that function. Ensuring the struct used to represent
// parquet rows uses pointers for optionals ensures that we can properly represent NULL values.
messageType, err := GenerateStructType(conf, schemaOpts{
optionalsAsStructTags: false,
optionalAsPtrs: true,
})
if err != nil {
return nil, fmt.Errorf(
"failed to generate struct type from parquet schema(message): %w", err)
}

schema := parquet.SchemaOf(reflect.New(schemaType).Interface())

return newParquetEncodeProcessor(logger, schema, compressDefault, schemaType)
return newParquetEncodeProcessor(logger, schema, compressDefault, messageType)
}

type parquetEncodeProcessor struct {
Expand Down
72 changes: 64 additions & 8 deletions internal/impl/parquet/processor_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"github.com/warpstreamlabs/bento/public/service"
)

func TestParquetEncodePanic(t *testing.T) {
// Before we changed this, the library used to mix and match behavior where sometimes it would quietly
// downscale the value, and other times it would not. Now we always just do a straight cast.
func TestParquetEncodeDoesNotPanic(t *testing.T) {
encodeConf, err := parquetEncodeProcessorConfig().ParseYAML(`
schema:
- { name: id, type: FLOAT }
Expand All @@ -30,8 +32,7 @@ schema:
_, err = encodeProc.ProcessBatch(tctx, service.MessageBatch{
service.NewMessage([]byte(`{"id":1e99,"name":"foo"}`)),
})
require.Error(t, err)
assert.Contains(t, err.Error(), "cannot represent 1e+99 as float32")
require.NoError(t, err)

encodeConf, err = parquetEncodeProcessorConfig().ParseYAML(`
schema:
Expand All @@ -46,9 +47,9 @@ schema:
_, err = encodeProc.ProcessBatch(tctx, service.MessageBatch{
service.NewMessage([]byte(`{"id":1e10,"name":"foo"}`)),
})
require.Error(t, err)
assert.Contains(t, err.Error(), "cannot represent 1e+10 as int32")
require.NoError(t, err)
}

func TestParquetEncodeDecodeRoundTrip(t *testing.T) {
encodeConf, err := parquetEncodeProcessorConfig().ParseYAML(`
schema:
Expand All @@ -65,6 +66,14 @@ schema:
fields:
- { name: a_stuff, type: BYTE_ARRAY }
- { name: b_stuff, type: BYTE_ARRAY }
- { name: h, type: DECIMAL32, decimal_precision: 3, optional: True}
- name: ob
fields:
- name: ob_name
type: UTF8
- name: bidValue
type: FLOAT
optional: true
`, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -131,6 +140,14 @@ schema:
fields:
- { name: a_stuff, type: BYTE_ARRAY }
- { name: b_stuff, type: BYTE_ARRAY }
- { name: h, type: DECIMAL32, decimal_precision: 3, optional: True}
- name: ob
fields:
- name: ob_name
type: UTF8
- name: bidValue
type: FLOAT
optional: true
`, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -244,7 +261,9 @@ func testParquetEncodeDecodeRoundTrip(t *testing.T, encodeProc *parquetEncodePro
"a_stuff": "a value",
"b_stuff": "b value"
},
"canary":"not in schema"
"canary":"not in schema",
"h": 1.0,
"ob":{"ob_name":"test","bidValue":0.15}
}`,
output: `{
"id": 3,
Expand All @@ -258,7 +277,9 @@ func testParquetEncodeDecodeRoundTrip(t *testing.T, encodeProc *parquetEncodePro
"nested_stuff": {
"a_stuff": "a value",
"b_stuff": "b value"
}
},
"h": 1.0,
"ob":{"ob_name":"test","bidValue":0.15}
}`,
},
{
Expand All @@ -281,7 +302,9 @@ func testParquetEncodeDecodeRoundTrip(t *testing.T, encodeProc *parquetEncodePro
"e": null,
"f": 7,
"g": "logical string represent",
"nested_stuff": null
"nested_stuff": null,
"h": null,
"ob": null
}`,
},
} {
Expand Down Expand Up @@ -561,3 +584,36 @@ schema:
require.Equal(t, schema.String(), encodeProc.schema.String())
}
}

func TestEncodeDecimalOptional(t *testing.T) {
tests := []struct {
config string
expected parquet.Node
}{
{
config: `
schema:
- name: floor_value
optional: true
type: DECIMAL32
decimal_scale: 4
decimal_precision: 8
`,
expected: parquet.Group{
"floor_value": parquet.Optional(parquet.Decimal(4, 8, parquet.Int32Type)),
},
},
}

for _, tt := range tests {
encodeConf, err := parquetEncodeProcessorConfig().ParseYAML(tt.config, nil)
require.NoError(t, err)

encodeProc, err := newParquetEncodeProcessorFromConfig(encodeConf, nil)
require.NoError(t, err)

schema := parquet.NewSchema("", tt.expected)
require.Equal(t, schema.String(), encodeProc.schema.String())
}

}
Loading

0 comments on commit c625815

Please sign in to comment.