Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3115-Fix int96 read issue in complex type #3118

Merged
merged 5 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
Expand All @@ -29,8 +30,9 @@ class AvroCompatRecordMaterializer<T extends IndexedRecord> extends RecordMateri

private AvroIndexedRecordConverter<T> root;

public AvroCompatRecordMaterializer(MessageType requestedSchema, Schema avroSchema, GenericData baseModel) {
this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel);
public AvroCompatRecordMaterializer(
MessageType requestedSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) {
this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel, conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.parquet.Preconditions;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
Expand Down Expand Up @@ -57,20 +58,26 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
private final GenericData model;
private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field, Object>();

public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) {
this(parquetSchema, avroSchema, SpecificData.get());
public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema, ParquetConfiguration conf) {
this(parquetSchema, avroSchema, SpecificData.get(), conf);
}

public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema, GenericData baseModel) {
this(null, parquetSchema, avroSchema, baseModel);
public AvroIndexedRecordConverter(
MessageType parquetSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) {
this(null, parquetSchema, avroSchema, baseModel, conf);
}

public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema) {
this(parent, parquetSchema, avroSchema, SpecificData.get());
public AvroIndexedRecordConverter(
ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, ParquetConfiguration conf) {
this(parent, parquetSchema, avroSchema, SpecificData.get(), conf);
}

public AvroIndexedRecordConverter(
ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, GenericData baseModel) {
ParentValueContainer parent,
GroupType parquetSchema,
Schema avroSchema,
GenericData baseModel,
ParquetConfiguration conf) {
this.parent = parent;
this.avroSchema = avroSchema;
int schemaSize = parquetSchema.getFieldCount();
Expand All @@ -89,13 +96,17 @@ public AvroIndexedRecordConverter(
Schema.Field avroField = getAvroField(parquetField.getName());
Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema());
final int finalAvroIndex = avroFieldIndexes.remove(avroField.name());
converters[parquetFieldIndex++] =
newConverter(nonNullSchema, parquetField, model, new ParentValueContainer() {
converters[parquetFieldIndex++] = newConverter(
nonNullSchema,
parquetField,
model,
new ParentValueContainer() {
@Override
public void add(Object value) {
AvroIndexedRecordConverter.this.set(finalAvroIndex, value);
}
});
},
conf);
}
// store defaults for any new Avro fields from avroSchema that are not in the writer schema (parquetSchema)
for (String fieldName : avroFieldIndexes.keySet()) {
Expand Down Expand Up @@ -137,7 +148,8 @@ private Schema.Field getAvroField(String parquetFieldName) {
return avroField;
}

private static Converter newConverter(Schema schema, Type type, GenericData model, ParentValueContainer setter) {
private static Converter newConverter(
Schema schema, Type type, GenericData model, ParentValueContainer setter, ParquetConfiguration conf) {

LogicalType logicalType = schema.getLogicalType();
// the expected type is always null because it is determined by the parent
Expand All @@ -148,7 +160,7 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode

switch (schema.getType()) {
case ARRAY:
return new AvroArrayConverter(parent, type.asGroupType(), schema, model);
return new AvroArrayConverter(parent, type.asGroupType(), schema, model, conf);
case BOOLEAN:
return new AvroConverters.FieldBooleanConverter(parent);
case BYTES:
Expand All @@ -166,13 +178,13 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode
case LONG:
return new AvroConverters.FieldLongConverter(parent);
case MAP:
return new MapConverter(parent, type.asGroupType(), schema, model);
return new MapConverter(parent, type.asGroupType(), schema, model, conf);
case RECORD:
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model);
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model, conf);
case STRING:
return new AvroConverters.FieldStringConverter(parent);
case UNION:
return new AvroUnionConverter(parent, type, schema, model);
return new AvroUnionConverter(parent, type, schema, model, conf);
case NULL: // fall through
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -317,25 +329,35 @@ static final class AvroArrayConverter extends GroupConverter {
private final Converter converter;
private GenericArray<Object> array;

public AvroArrayConverter(ParentValueContainer parent, GroupType type, Schema avroSchema, GenericData model) {
public AvroArrayConverter(
ParentValueContainer parent,
GroupType type,
Schema avroSchema,
GenericData model,
ParquetConfiguration conf) {
this.parent = parent;
this.avroSchema = avroSchema;
Schema elementSchema = AvroSchemaConverter.getNonNull(avroSchema.getElementType());
Type repeatedType = type.getType(0);
// always determine whether the repeated type is the element type by
// matching it against the element schema.
if (AvroRecordConverter.isElementType(repeatedType, elementSchema)) {
if (AvroRecordConverter.isElementType(repeatedType, elementSchema, conf)) {
// the element type is the repeated type (and required)
converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
@Override
@SuppressWarnings("unchecked")
public void add(Object value) {
array.add(value);
}
});
converter = newConverter(
elementSchema,
repeatedType,
model,
new ParentValueContainer() {
@Override
@SuppressWarnings("unchecked")
public void add(Object value) {
array.add(value);
}
},
conf);
} else {
// the element is wrapped in a synthetic group and may be optional
converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model);
converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model, conf);
}
}

Expand Down Expand Up @@ -369,16 +391,21 @@ final class ElementConverter extends GroupConverter {
private Object element;
private final Converter elementConverter;

public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) {
public ElementConverter(
GroupType repeatedType, Schema elementSchema, GenericData model, ParquetConfiguration conf) {
Type elementType = repeatedType.getType(0);
Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
this.elementConverter =
newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() {
this.elementConverter = newConverter(
nonNullElementSchema,
elementType,
model,
new ParentValueContainer() {
@Override
public void add(Object value) {
ElementConverter.this.element = value;
}
});
},
conf);
}

@Override
Expand Down Expand Up @@ -406,7 +433,11 @@ static final class AvroUnionConverter extends GroupConverter {
private Object memberValue = null;

public AvroUnionConverter(
ParentValueContainer parent, Type parquetSchema, Schema avroSchema, GenericData model) {
ParentValueContainer parent,
Type parquetSchema,
Schema avroSchema,
GenericData model,
ParquetConfiguration conf) {
this.parent = parent;
GroupType parquetGroup = parquetSchema.asGroupType();
this.memberConverters = new Converter[parquetGroup.getFieldCount()];
Expand All @@ -416,15 +447,19 @@ public AvroUnionConverter(
Schema memberSchema = avroSchema.getTypes().get(index);
if (!memberSchema.getType().equals(Schema.Type.NULL)) {
Type memberType = parquetGroup.getType(parquetIndex);
memberConverters[parquetIndex] =
newConverter(memberSchema, memberType, model, new ParentValueContainer() {
memberConverters[parquetIndex] = newConverter(
memberSchema,
memberType,
model,
new ParentValueContainer() {
@Override
public void add(Object value) {
Preconditions.checkArgument(
memberValue == null, "Union is resolving to more than one type");
memberValue = value;
}
});
},
conf);
parquetIndex++; // Note for nulls the parquetIndex id not increased
}
}
Expand Down Expand Up @@ -452,10 +487,15 @@ static final class MapConverter<V> extends GroupConverter {
private final Converter keyValueConverter;
private Map<String, V> map;

public MapConverter(ParentValueContainer parent, GroupType mapType, Schema mapSchema, GenericData model) {
public MapConverter(
ParentValueContainer parent,
GroupType mapType,
Schema mapSchema,
GenericData model,
ParquetConfiguration conf) {
this.parent = parent;
GroupType repeatedKeyValueType = mapType.getType(0).asGroupType();
this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model);
this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model, conf);
}

@Override
Expand All @@ -480,7 +520,8 @@ final class MapKeyValueConverter extends GroupConverter {
private final Converter keyConverter;
private final Converter valueConverter;

public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, GenericData model) {
public MapKeyValueConverter(
GroupType keyValueType, Schema mapSchema, GenericData model, ParquetConfiguration conf) {
keyConverter = new PrimitiveConverter() {
@Override
public final void addBinary(Binary value) {
Expand All @@ -490,13 +531,18 @@ public final void addBinary(Binary value) {

Type valueType = keyValueType.getType(1);
Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType());
valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() {
@Override
@SuppressWarnings("unchecked")
public void add(Object value) {
MapKeyValueConverter.this.value = (V) value;
}
});
valueConverter = newConverter(
nonNullValueSchema,
valueType,
model,
new ParentValueContainer() {
@Override
@SuppressWarnings("unchecked")
public void add(Object value) {
MapKeyValueConverter.this.value = (V) value;
}
},
conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ public RecordMaterializer<T> prepareForRead(
GenericData model = getDataModel(configuration, avroSchema);
String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY);
if (Boolean.parseBoolean(compatEnabled)) {
return newCompatMaterializer(parquetSchema, avroSchema, model);
return newCompatMaterializer(parquetSchema, avroSchema, model, configuration);
}
return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, model);
return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, model, configuration);
}

@SuppressWarnings("unchecked")
private static <T> RecordMaterializer<T> newCompatMaterializer(
MessageType parquetSchema, Schema avroSchema, GenericData model) {
return (RecordMaterializer<T>) new AvroCompatRecordMaterializer(parquetSchema, avroSchema, model);
MessageType parquetSchema, Schema avroSchema, GenericData model, ParquetConfiguration conf) {
return (RecordMaterializer<T>) new AvroCompatRecordMaterializer(parquetSchema, avroSchema, model, conf);
}

private GenericData getDataModel(ParquetConfiguration conf, Schema schema) {
Expand Down
Loading
Loading