Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Can't redefine: array #12751

Open
DontSingRus opened this issue Jan 31, 2025 · 3 comments
Open

[SUPPORT] Can't redefine: array #12751

DontSingRus opened this issue Jan 31, 2025 · 3 comments

Comments

@DontSingRus
Copy link

Describe the problem you faced

I have a source with protobuf, after parse them in pyspark df with fixed structure start writing in hudi table. First part of data write done, second write causes an error: org.apache.avro.SchemaParseException: Can't redefine: array

df structure:

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- col_1: string (nullable = true)
 |-- col_2: string (nullable = true)
 |-- col_3: string (nullable = true)
 |-- col_4: string (nullable = true)
 |-- col_5: string (nullable = true)
 |-- col_6: string (nullable = true)
 |-- col_7: boolean (nullable = true)
 |-- col_8: string (nullable = true)
 |-- col_9: string (nullable = true)
 |-- col_10: string (nullable = true)
 |-- col_11: boolean (nullable = true)
 |-- col_12: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- col_2: string (nullable = true)
 |    |    |-- col_8: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- event_idempotency: string (nullable = true)
 |-- col_13: string (nullable = true)
 |-- col_14: string (nullable = true)
 |-- body: struct (nullable = true)
 |    |-- col_15: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- col_16: string (nullable = true)
 |    |    |    |-- col_12: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- col_2: string (nullable = true)
 |    |    |    |    |    |-- col_8: string (nullable = true)
 |    |-- col_17: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- col_18: boolean (nullable = true)
 |-- col_19: string (nullable = true)
 |-- col_20: string (nullable = true)
 |-- load_timestamp: timestamp (nullable = false)
 |-- date: string (nullable = true)

write options:

'hoodie.table.name': 'table_name',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'date',
'hoodie.datasource.write.table.name': 'table_name',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'etl__ods_load_timestamp',
'hoodie.upsert.shuffle.parallelism': 32,
'hoodie.insert.shuffle.parallelism': 32,
'hoodie.clustering.async.enabled': 'true'

To Reproduce

I couldn't reproduce the problem, I tried it locally, on a similar server with the same data, everything is recording successfully. On the same server where the error occurred with different data and a different schema, but with the same code, everything is still successful

Expected behavior

I was expecting to see a successful entry like it was before

Environment Description

  • Hudi version : 0.15.0

  • Spark version : 3.0.3

  • Storage (HDFS/S3/GCS..) : s3

  • Running on Docker? (yes/no) : no

  • jars: parquet-common-1.10.1, parquet-hadoop-1.10.1, spark-avro_2.12-3.0.3

Stacktrace

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1073)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:287)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :469
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:344)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:254)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: array
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:151)
	at org.apache.hudi.table.HoodieSparkTable.runMerge(HoodieSparkTable.java:148)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:376)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:371)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:337)
	... 28 more
Caused by: org.apache.avro.SchemaParseException: Can't redefine: array
	at org.apache.avro.Schema$Names.put(Schema.java:1128)
	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882)
	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
	at org.apache.avro.Schema.toString(Schema.java:324)
	at org.apache.avro.Schema.toString(Schema.java:314)
	at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:69)
	at org.apache.hudi.io.hadoop.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:170)
	at org.apache.hudi.io.hadoop.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:101)
	at org.apache.hudi.io.hadoop.HoodieAvroParquetReader.getRecordIterator(HoodieAvroParquetReader.java:80)
	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:128)
	... 32 more
@rangareddy
Copy link

@DontSingRus

This issue is similar to the ones described in 7717 and 7145.

It appears to be caused by a parquet-avro library version mismatch. Could you please try using the latest version of Spark (e.g., 3.2 or higher) to see if that resolves the issue?

@DontSingRus
Copy link
Author

DontSingRus commented Feb 3, 2025

@rangareddy , thank u for answer!
I will test with spark 3.5 today. But more interesting how fix it with spark 3.0.3, only parquet-common-1.10.1, parquet-hadoop-1.10.1 update?

@rangareddy
Copy link

Hi @DontSingRus

Is there any update with Spark 3.5 and if it is working we will close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants