Skip to content

Commit

Permalink
#34009 avro generic record to beam row conversion added support for a…
Browse files Browse the repository at this point in the history
…ll logical types and conversions
  • Loading branch information
wollowizard authored and alfredo-scaccialepre-edo committed Feb 19, 2025
1 parent 7f08bba commit d5a383b
Show file tree
Hide file tree
Showing 5 changed files with 436 additions and 40 deletions.
2 changes: 1 addition & 1 deletion buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dependencies {
implementation("com.github.spotbugs.snom:spotbugs-gradle-plugin:5.0.14")

runtimeOnly("com.google.protobuf:protobuf-gradle-plugin:0.8.13") // Enable proto code generation
runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.9.1") // Enable Avro code generation
runtimeOnly("com.github.davidmc24.gradle.plugin:gradle-avro-plugin:1.1.0") // Enable Avro code generation. Version 1.1.0 is the last supporting avro 1.10.2
runtimeOnly("com.diffplug.spotless:spotless-plugin-gradle:5.6.1") // Enable a code formatting plugin
runtimeOnly("gradle.plugin.com.dorongold.plugins:task-tree:1.5") // Adds a 'taskTree' task to print task dependency tree
runtimeOnly("gradle.plugin.com.github.johnrengelman:shadow:7.1.1") // Enable shading Java dependencies
Expand Down
5 changes: 4 additions & 1 deletion sdks/java/extensions/avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dependencies {
implementation library.java.error_prone_annotations
implementation library.java.avro
implementation library.java.joda_time
implementation 'org.apache.commons:commons-lang3'
testImplementation(project(path: ":sdks:java:core", configuration: "shadowTest")) {
// Exclude Avro dependencies from "core" since Avro support moved to this extension
exclude group: "org.apache.avro", module: "avro"
Expand Down Expand Up @@ -143,9 +144,11 @@ avroVersions.each { k, v ->
main = "org.apache.avro.tool.Main"
args = [
"compile",
"-bigDecimal", // Use BigDecimal for logical type decimal, similarly to what gradle-avro-plugin does
"schema",
"src/test/avro/org/apache/beam/sdk/extensions/avro/io/user.avsc",
"src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/test.avsc",
"src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/logicaltypes/logical-types.avsc",
"build/generated/sources/avro$k/test/java"
]
}
Expand All @@ -161,4 +164,4 @@ static def createTaskNames(Map<String, String> prefixMap, String suffix) {
return prefixMap.keySet().stream()
.map { version -> "avroVersion${version}${suffix}" }
.collect(Collectors.toList())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand All @@ -49,6 +50,7 @@
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import net.bytebuddy.matcher.ElementMatchers;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand All @@ -61,6 +63,7 @@
import org.apache.avro.reflect.AvroName;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.schemas.AvroRecordSchema;
Expand Down Expand Up @@ -97,6 +100,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -154,6 +158,15 @@ public class AvroUtils {
new ForLoadedType(ReadableInstant.class);
private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class);

private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS;

static {
GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData();
addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS);
GENERIC_DATA_WITH_DEFAULT_CONVERSIONS.addLogicalTypeConversion(
new Conversions.DecimalConversion());
}

// contains workarounds for third-party methods that accept nullable arguments but lack proper
// annotations
private static class NullnessCheckerWorkarounds {
Expand Down Expand Up @@ -552,23 +565,43 @@ public static org.apache.avro.Schema toAvroSchema(Schema beamSchema) {
* Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during
* conversion. If Schema is not provided, one is inferred from the AVRO schema.
*/
public static Row toBeamRowStrict(GenericRecord record, @Nullable Schema schema) {
public static Row toBeamRowStrict(
GenericRecord record, @Nullable Schema schema, @Nullable GenericData genericData) {
if (schema == null) {
schema = toBeamSchema(record.getSchema());
}

if (genericData == null) {
if (record instanceof SpecificRecordBase) {
// in case of SpecificRecord, the MODEL$ GenericData already has registered the specific
// conversions
genericData = getGenericData((SpecificRecordBase) record);
} else {
genericData = GENERIC_DATA_WITH_DEFAULT_CONVERSIONS;
}
}

Row.Builder builder = Row.withSchema(schema);
org.apache.avro.Schema avroSchema = record.getSchema();

for (Field field : schema.getFields()) {
Object value = record.get(field.getName());
org.apache.avro.Schema fieldAvroSchema = avroSchema.getField(field.getName()).schema();
builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, field.getType()));
builder.addValue(
convertAvroFieldStrict(value, fieldAvroSchema, field.getType(), genericData));
}

return builder.build();
}

/**
* Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during
* conversion. If Schema is not provided, one is inferred from the AVRO schema.
*/
public static Row toBeamRowStrict(GenericRecord record, @Nullable Schema schema) {
return toBeamRowStrict(record, schema, null);
}

/**
* Convert from a Beam Row to an AVRO GenericRecord. The Avro Schema is inferred from the Beam
* schema on the row.
Expand Down Expand Up @@ -1323,51 +1356,91 @@ private static org.apache.avro.Schema getFieldSchema(
}
}

private static Object convertLogicalType(
@PolyNull Object value,
@Nonnull org.apache.avro.Schema avroSchema,
@Nonnull FieldType fieldType,
@Nonnull GenericData genericData) {
TypeWithNullability type = new TypeWithNullability(avroSchema);
LogicalType logicalType = LogicalTypes.fromSchema(type.type);
if (logicalType == null) {
return null;
}

Object rawType = value;

Conversion<?> conversion = genericData.getConversionByClass(value.getClass(), logicalType);
Class<?> convertedType = null;
if (conversion != null) {
convertedType = conversion.getConvertedType();
if (convertedType.isInstance(value)) {
rawType = Conversions.convertToRawType(value, avroSchema, logicalType, conversion);
}
}

if (logicalType instanceof LogicalTypes.Date) {
return convertDateStrict(
checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType),
fieldType);
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
return checkRawType(Integer.class, value, logicalType, rawType, conversion, convertedType);
} else if (logicalType instanceof LogicalTypes.TimeMicros) {
return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
return convertDateTimeStrict(
checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType),
fieldType);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType);
} else if ("local-timestamp-millis".equals(logicalType.getName())) {
return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType);
} else if ("local-timestamp-micros".equals(logicalType.getName())) {
return checkRawType(Long.class, value, logicalType, rawType, conversion, convertedType);
} else if (logicalType instanceof LogicalTypes.Decimal) {
if (rawType instanceof GenericFixed) {
// Decimal can be backed by ByteBuffer or GenericFixed. in case of GenericFixed, we convert
// it to ByteBuffer here
rawType = ByteBuffer.wrap(((GenericFixed) rawType).bytes());
}
ByteBuffer byteBuffer =
checkRawType(ByteBuffer.class, value, logicalType, rawType, conversion, convertedType);
Conversion<BigDecimal> decimalConversion = new Conversions.DecimalConversion();
BigDecimal bigDecimal =
decimalConversion.fromBytes(byteBuffer.duplicate(), type.type, logicalType);
return convertDecimal(bigDecimal, fieldType);
} else if (LogicalTypes.uuid().equals(logicalType)) {
return UUID.fromString(rawType.toString()).toString();
}
return null;
}

/**
* Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during
* conversion.
*
* @param value {@link GenericRecord} or any nested value
* @param avroSchema schema for value
* @param fieldType target beam field type
* @param genericData {@link GenericData} instance to use for conversions
* @return value converted for {@link Row}
*/
@SuppressWarnings("unchecked")
public static @PolyNull Object convertAvroFieldStrict(
@PolyNull Object value,
@Nonnull org.apache.avro.Schema avroSchema,
@Nonnull FieldType fieldType) {
@Nonnull FieldType fieldType,
@Nonnull GenericData genericData) {

if (value == null) {
return null;
}
Object convertedLogicalType = convertLogicalType(value, avroSchema, fieldType, genericData);

TypeWithNullability type = new TypeWithNullability(avroSchema);
LogicalType logicalType = LogicalTypes.fromSchema(type.type);
if (logicalType != null) {
if (logicalType instanceof LogicalTypes.Decimal) {
ByteBuffer byteBuffer = (ByteBuffer) value;
BigDecimal bigDecimal =
new Conversions.DecimalConversion()
.fromBytes(byteBuffer.duplicate(), type.type, logicalType);
return convertDecimal(bigDecimal, fieldType);
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
if (value instanceof ReadableInstant) {
return convertDateTimeStrict(((ReadableInstant) value).getMillis(), fieldType);
} else {
return convertDateTimeStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.Date) {
if (value instanceof ReadableInstant) {
int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
return convertDateStrict(epochDays, fieldType);
} else if (value instanceof java.time.LocalDate) {
return convertDateStrict((int) ((java.time.LocalDate) value).toEpochDay(), fieldType);
} else {
return convertDateStrict((Integer) value, fieldType);
}
}
if (convertedLogicalType != null) {
return convertedLogicalType;
}

TypeWithNullability type = new TypeWithNullability(avroSchema);

switch (type.type.getType()) {
case FIXED:
return convertFixedStrict((GenericFixed) value, fieldType);
Expand Down Expand Up @@ -1402,14 +1475,15 @@ private static org.apache.avro.Schema getFieldSchema(
return convertEnumStrict(value, fieldType);

case ARRAY:
return convertArrayStrict((List<Object>) value, type.type.getElementType(), fieldType);
return convertArrayStrict(
(List<Object>) value, type.type.getElementType(), fieldType, genericData);

case MAP:
return convertMapStrict(
(Map<CharSequence, Object>) value, type.type.getValueType(), fieldType);
(Map<CharSequence, Object>) value, type.type.getValueType(), fieldType, genericData);

case UNION:
return convertUnionStrict(value, type.type, fieldType);
return convertUnionStrict(value, type.type, fieldType, genericData);

case NULL:
throw new IllegalArgumentException("Can't convert 'null' to non-nullable field");
Expand All @@ -1419,6 +1493,24 @@ private static org.apache.avro.Schema getFieldSchema(
}
}

/**
* Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during
* conversion.
*
* @param value {@link GenericRecord} or any nested value
* @param avroSchema schema for value
* @param fieldType target beam field type
* @return value converted for {@link Row}
*/
@SuppressWarnings("unchecked")
public static @PolyNull Object convertAvroFieldStrict(
@PolyNull Object value,
@Nonnull org.apache.avro.Schema avroSchema,
@Nonnull FieldType fieldType) {
return convertAvroFieldStrict(
value, avroSchema, fieldType, GENERIC_DATA_WITH_DEFAULT_CONVERSIONS);
}

private static Object convertRecordStrict(GenericRecord record, FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), TypeName.ROW, "record");
return toBeamRowStrict(record, fieldType.getRowSchema());
Expand Down Expand Up @@ -1495,27 +1587,35 @@ private static Object convertEnumStrict(Object value, FieldType fieldType) {
}

private static Object convertUnionStrict(
Object value, org.apache.avro.Schema unionAvroSchema, FieldType fieldType) {
Object value,
org.apache.avro.Schema unionAvroSchema,
FieldType fieldType,
GenericData genericData) {
checkTypeName(fieldType.getTypeName(), TypeName.LOGICAL_TYPE, "oneOfType");
checkArgument(
checkNotNull(fieldType.getLogicalType()).getIdentifier().equals(OneOfType.IDENTIFIER));
OneOfType oneOfType = fieldType.getLogicalType(OneOfType.class);
int fieldNumber = GenericData.get().resolveUnion(unionAvroSchema, value);
FieldType baseFieldType = oneOfType.getOneOfSchema().getField(fieldNumber).getType();
Object convertedValue =
convertAvroFieldStrict(value, unionAvroSchema.getTypes().get(fieldNumber), baseFieldType);
convertAvroFieldStrict(
value, unionAvroSchema.getTypes().get(fieldNumber), baseFieldType, genericData);
return oneOfType.createValue(fieldNumber, convertedValue);
}

private static Object convertArrayStrict(
List<Object> values, org.apache.avro.Schema elemAvroSchema, FieldType fieldType) {
List<Object> values,
org.apache.avro.Schema elemAvroSchema,
FieldType fieldType,
GenericData genericData) {
checkTypeName(fieldType.getTypeName(), TypeName.ARRAY, "array");

List<Object> ret = new ArrayList<>(values.size());
FieldType elemFieldType = fieldType.getCollectionElementType();

for (Object value : values) {
ret.add(convertAvroFieldStrict(value, elemAvroSchema, checkNotNull(elemFieldType)));
ret.add(
convertAvroFieldStrict(value, elemAvroSchema, checkNotNull(elemFieldType), genericData));
}

return ret;
Expand All @@ -1524,7 +1624,8 @@ private static Object convertArrayStrict(
private static Object convertMapStrict(
Map<CharSequence, Object> values,
org.apache.avro.Schema valueAvroSchema,
FieldType fieldType) {
FieldType fieldType,
GenericData genericData) {
checkTypeName(fieldType.getTypeName(), TypeName.MAP, "map");
FieldType mapKeyType = checkNotNull(fieldType.getMapKeyType());
FieldType mapValueType = checkNotNull(fieldType.getMapValueType());
Expand All @@ -1539,7 +1640,7 @@ private static Object convertMapStrict(
for (Map.Entry<CharSequence, Object> value : values.entrySet()) {
ret.put(
convertStringStrict(value.getKey(), mapKeyType),
convertAvroFieldStrict(value.getValue(), valueAvroSchema, mapValueType));
convertAvroFieldStrict(value.getValue(), valueAvroSchema, mapValueType, genericData));
}

return ret;
Expand All @@ -1563,4 +1664,45 @@ private static org.apache.avro.Schema buildHiveLogicalTypeSchema(
hiveLogicalType, size);
return new org.apache.avro.Schema.Parser().parse(schemaJson);
}

private static GenericData getGenericData(SpecificRecordBase record) {
try {
return record.getSpecificData();
} catch (NoSuchMethodError e) {
try {
// SpecificRecordBase.getSpecificData() was not available in avro 182
return (GenericData) FieldUtils.readStaticField(record.getClass(), "MODEL$", true);
} catch (IllegalAccessException ex) {
throw new IllegalArgumentException(
"Unable to access MODEL$ field in SpecificRecordBase class", ex);
}
}
}

private static <T> T checkRawType(
Class<T> desiredRawType,
Object value,
LogicalType logicalType,
Object rawType,
Conversion<?> conversion,
Class<?> convertedType) {
String msg =
String.format(
"Value %s of class %s is not a supported type for logical type %s. "
+ "Underlying avro built-in raw type should be instance of %s. "
+ "However it is instance of %s and has value %s ."
+ "Generic data has conversion %s, convertedType %s",
value,
value.getClass(),
logicalType,
desiredRawType,
rawType.getClass(),
rawType,
conversion,
convertedType);
if (!desiredRawType.isInstance(rawType)) {
throw new IllegalArgumentException(msg);
}
return (T) rawType;
}
}
Loading

0 comments on commit d5a383b

Please sign in to comment.